You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
316 lines
9.0 KiB
316 lines
9.0 KiB
using EC.Util.Common;
|
|
using MQTTnet;
|
|
using MQTTnet.Client;
|
|
using MQTTnet.Exceptions;
|
|
using MQTTnet.Protocol;
|
|
using MQTTnet.Server;
|
|
using NewLife.Serialization;
|
|
using Newtonsoft.Json.Linq;
|
|
using System.Text;
|
|
|
|
namespace JiLinApp.Biz.TransmitAlarm;
|
|
|
|
public class AlarmMqttService : IAlarmService
|
|
{
|
|
#region Fields
|
|
|
|
private MqttConfig Config { get; }
|
|
|
|
//private MqttServer? Server { get; }
|
|
|
|
private IMqttClient Client { get; }
|
|
|
|
//private MqttServerOptions ServerOptions { get; }
|
|
|
|
private MqttClientOptions ClientOptions { get; }
|
|
|
|
private MqttClientSubscribeOptions SubscribeOptions { get; }
|
|
|
|
public event IAlarmService.HandleRecvEvent? OnFenceUdpSendDevices;
|
|
|
|
public event IAlarmService.HandleRecvEvent? OnVibrateTcpSendDevices;
|
|
|
|
public event IAlarmService.HandleRecvEvent? OnFenceUdpSendSensors;
|
|
|
|
public event IAlarmService.HandleRecvEvent? OnVibrateTcpSendSensors;
|
|
|
|
#endregion Fields
|
|
|
|
public AlarmMqttService(MqttConfig config)
|
|
{
|
|
MqttFactory factory = new();
|
|
ClientOptions = factory.CreateClientOptionsBuilder()
|
|
.WithTcpServer(config.Ip, config.Port)
|
|
.WithClientId(config.ClientId)
|
|
.WithCredentials(config.UserName, config.Password)
|
|
.WithCleanSession(false)
|
|
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
|
|
.Build();
|
|
SubscribeOptions = factory.CreateSubscribeOptionsBuilder()
|
|
.WithTopicFilter(f => f.WithTopic(config.SubCmdTopic))
|
|
.Build();
|
|
IMqttClient client = factory.CreateMqttClient();
|
|
client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
|
|
|
|
Config = config;
|
|
Client = client;
|
|
}
|
|
|
|
~AlarmMqttService()
|
|
{
|
|
Close();
|
|
}
|
|
|
|
#region Base
|
|
|
|
public void Start()
|
|
{
|
|
if (IsRuning()) return;
|
|
int retryTime = Config.RetryTime, retryInterval = Config.RetryInterval;
|
|
for (int i = 1; i <= retryTime; i++)
|
|
{
|
|
MqttClientConnectResult connResult = Client.ConnectAsync(ClientOptions, CancellationToken.None).Result;
|
|
if (connResult.ResultCode == MqttClientConnectResultCode.Success) break;
|
|
if (i == retryTime) throw new MqttCommunicationTimedOutException();
|
|
Thread.Sleep(retryInterval);
|
|
}
|
|
for (int i = 1; i <= retryTime; i++)
|
|
{
|
|
MqttClientSubscribeResult subResult = Client.SubscribeAsync(SubscribeOptions, CancellationToken.None).Result;
|
|
bool flag = true;
|
|
foreach (var item in subResult.Items)
|
|
{
|
|
if (item.ResultCode > MqttClientSubscribeResultCode.GrantedQoS2)
|
|
{
|
|
flag = false;
|
|
break;
|
|
}
|
|
}
|
|
if (flag) break;
|
|
if (i == retryTime) throw new MqttCommunicationTimedOutException();
|
|
Thread.Sleep(retryInterval);
|
|
}
|
|
}
|
|
|
|
public void Close()
|
|
{
|
|
if (!IsRuning()) return;
|
|
Client.DisconnectAsync().Wait();
|
|
}
|
|
|
|
public bool IsRuning()
|
|
{
|
|
return Client != null && Client.IsConnected;
|
|
}
|
|
|
|
private object StartAtomObj { get; } = new();
|
|
|
|
private bool StartAtom()
|
|
{
|
|
lock (StartAtomObj)
|
|
{
|
|
if (!IsRuning()) Start();
|
|
}
|
|
return IsRuning();
|
|
}
|
|
|
|
#endregion Base
|
|
|
|
#region Server Event
|
|
|
|
private Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
|
|
{
|
|
if (arg.ClientId.Length == 0) arg.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
|
|
else if (arg.UserName != Config.UserName) arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
|
|
else if (arg.Password != Config.Password) arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg)
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private Task Server_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs arg)
|
|
{
|
|
string clientId = arg.ClientId;
|
|
ushort packetIdentifier = arg.PublishPacket.PacketIdentifier;
|
|
string topic = arg.PublishPacket.Topic;
|
|
string msg = Encoding.UTF8.GetString(arg.PublishPacket.Payload);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
#endregion Server Event
|
|
|
|
#region Client Event
|
|
|
|
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
|
|
{
|
|
string topic = arg.ApplicationMessage.Topic;
|
|
string msg = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
|
|
string qos = arg.ApplicationMessage.QualityOfServiceLevel.ToString();
|
|
string retained = arg.ApplicationMessage.Retain.ToString();
|
|
if (topic.Equals(Config.SubCmdTopic))
|
|
{
|
|
HandleRecvSubCmdTopic(topic, msg);
|
|
}
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private void HandleRecvSubCmdTopic(string topic, string reqJson)
|
|
{
|
|
JObject reqObj = JsonUtil.ToJObject(reqJson);
|
|
string cmd = reqObj.GetValue("cmd", StringComparison.OrdinalIgnoreCase)?.ToString() ?? "";
|
|
DeviceType type = (DeviceType)(reqObj.GetValue("type", StringComparison.OrdinalIgnoreCase).ToInt());
|
|
switch (cmd.Trim().ToLower())
|
|
{
|
|
case "getdevices":
|
|
switch (type)
|
|
{
|
|
case DeviceType.Vibrate:
|
|
OnVibrateTcpSendDevices?.Invoke(reqObj);
|
|
break;
|
|
|
|
case DeviceType.Fence:
|
|
OnFenceUdpSendDevices?.Invoke(reqObj);
|
|
break;
|
|
}
|
|
break;
|
|
|
|
case "getsensors":
|
|
switch (type)
|
|
{
|
|
case DeviceType.Vibrate:
|
|
OnVibrateTcpSendSensors?.Invoke(reqObj);
|
|
break;
|
|
|
|
case DeviceType.Fence:
|
|
OnFenceUdpSendSensors?.Invoke(reqObj);
|
|
break;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
#endregion Client Event
|
|
|
|
#region Send
|
|
|
|
private int Frame { get; set; } = 0;
|
|
|
|
private int IncFrame
|
|
{ get { return ++Frame % int.MaxValue; } }
|
|
|
|
private void Send(string topic, object obj)
|
|
{
|
|
MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder()
|
|
.WithTopic(topic)
|
|
.WithPayload(obj.ToJson())
|
|
.Build();
|
|
MqttClientPublishResult? pubResult = null;
|
|
while (pubResult == null || !pubResult.IsSuccess)
|
|
{
|
|
try
|
|
{
|
|
pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result;
|
|
}
|
|
catch (Exception)
|
|
{
|
|
while (!IsRuning())
|
|
{
|
|
if (StartAtom()) break;
|
|
Thread.Sleep(100);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private void SendByFrame(string topic, object obj)
|
|
{
|
|
//((dynamic)obj).Frame = IncFrame;// no work, transto dict, add frame, transto json
|
|
MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder()
|
|
.WithTopic(topic)
|
|
.WithPayload(obj.ToJson())
|
|
.Build();
|
|
MqttClientPublishResult? pubResult = null;
|
|
while (pubResult == null || !pubResult.IsSuccess)
|
|
{
|
|
try
|
|
{
|
|
pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result;
|
|
}
|
|
catch (Exception)
|
|
{
|
|
while (!IsRuning())
|
|
{
|
|
if (StartAtom()) break;
|
|
Thread.Sleep(100);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public void SendAlarm(AlarmMessage msg)
|
|
{
|
|
Send(Config.PubAlarmTopic, msg);
|
|
}
|
|
|
|
public void SendDevices(DeviceType type, List<object> deviceList)
|
|
{
|
|
object obj = new
|
|
{
|
|
type = type,
|
|
data = deviceList,
|
|
frame = IncFrame
|
|
};
|
|
Send(Config.PubDevicesTopic, obj);
|
|
}
|
|
|
|
public void SendSensors(DeviceType type, int deviceId, List<object> sensorList)
|
|
{
|
|
object obj = new
|
|
{
|
|
type = type,
|
|
deviceId = deviceId,
|
|
data = sensorList,
|
|
frame = IncFrame
|
|
};
|
|
Send(Config.PubSensorsTopic, obj);
|
|
}
|
|
|
|
public void SendDeviceState(DeviceType type, object device)
|
|
{
|
|
object obj = new
|
|
{
|
|
type = type,
|
|
data = device,
|
|
frame = IncFrame
|
|
};
|
|
Send(Config.PubDeviceStateTopic, obj);
|
|
}
|
|
|
|
public void SendSensorState(DeviceType type, object sensor)
|
|
{
|
|
object obj = new
|
|
{
|
|
type = type,
|
|
data = sensor,
|
|
frame = IncFrame
|
|
};
|
|
Send(Config.PubSensorStateTopic, obj);
|
|
}
|
|
|
|
#endregion Send
|
|
}
|
|
|
|
public enum DeviceType : int
|
|
{
|
|
Unknown = 0,
|
|
Vibrate = 3,
|
|
Fence = 4,
|
|
}
|