You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

316 lines
9.0 KiB

using EC.Util.Common;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using MQTTnet.Server;
using NewLife.Serialization;
using Newtonsoft.Json.Linq;
using System.Text;
namespace JiLinApp.Biz.TransmitAlarm;
public class AlarmMqttService : IAlarmService
#region Fields
private MqttConfig Config { get; }
//private MqttServer? Server { get; }
private IMqttClient Client { get; }
//private MqttServerOptions ServerOptions { get; }
private MqttClientOptions ClientOptions { get; }
private MqttClientSubscribeOptions SubscribeOptions { get; }
public event IAlarmService.HandleRecvEvent? OnFenceUdpSendDevices;
public event IAlarmService.HandleRecvEvent? OnVibrateTcpSendDevices;
public event IAlarmService.HandleRecvEvent? OnFenceUdpSendSensors;
public event IAlarmService.HandleRecvEvent? OnVibrateTcpSendSensors;
#endregion Fields
public AlarmMqttService(MqttConfig config)
MqttFactory factory = new();
ClientOptions = factory.CreateClientOptionsBuilder()
.WithTcpServer(config.Ip, config.Port)
.WithCredentials(config.UserName, config.Password)
SubscribeOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(config.SubCmdTopic))
IMqttClient client = factory.CreateMqttClient();
client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
Config = config;
Client = client;
#region Base
public void Start()
if (IsRuning()) return;
int retryTime = Config.RetryTime, retryInterval = Config.RetryInterval;
for (int i = 1; i <= retryTime; i++)
MqttClientConnectResult connResult = Client.ConnectAsync(ClientOptions, CancellationToken.None).Result;
if (connResult.ResultCode == MqttClientConnectResultCode.Success) break;
if (i == retryTime) throw new MqttCommunicationTimedOutException();
for (int i = 1; i <= retryTime; i++)
MqttClientSubscribeResult subResult = Client.SubscribeAsync(SubscribeOptions, CancellationToken.None).Result;
bool flag = true;
foreach (var item in subResult.Items)
if (item.ResultCode > MqttClientSubscribeResultCode.GrantedQoS2)
flag = false;
if (flag) break;
if (i == retryTime) throw new MqttCommunicationTimedOutException();
public void Close()
if (!IsRuning()) return;
public bool IsRuning()
return Client != null && Client.IsConnected;
private object StartAtomObj { get; } = new();
private bool StartAtom()
lock (StartAtomObj)
if (!IsRuning()) Start();
return IsRuning();
#endregion Base
#region Server Event
private Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
if (arg.ClientId.Length == 0) arg.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
else if (arg.UserName != Config.UserName) arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
else if (arg.Password != Config.Password) arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return Task.CompletedTask;
private Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg)
return Task.CompletedTask;
private Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
return Task.CompletedTask;
private Task Server_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs arg)
string clientId = arg.ClientId;
ushort packetIdentifier = arg.PublishPacket.PacketIdentifier;
string topic = arg.PublishPacket.Topic;
string msg = Encoding.UTF8.GetString(arg.PublishPacket.Payload);
return Task.CompletedTask;
#endregion Server Event
#region Client Event
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
string topic = arg.ApplicationMessage.Topic;
string msg = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
string qos = arg.ApplicationMessage.QualityOfServiceLevel.ToString();
string retained = arg.ApplicationMessage.Retain.ToString();
if (topic.Equals(Config.SubCmdTopic))
HandleRecvSubCmdTopic(topic, msg);
return Task.CompletedTask;
private void HandleRecvSubCmdTopic(string topic, string reqJson)
JObject reqObj = JsonUtil.ToJObject(reqJson);
string cmd = reqObj.GetValue("cmd", StringComparison.OrdinalIgnoreCase)?.ToString() ?? "";
DeviceType type = (DeviceType)(reqObj.GetValue("type", StringComparison.OrdinalIgnoreCase).ToInt());
switch (cmd.Trim().ToLower())
case "getdevices":
switch (type)
case DeviceType.Vibrate:
case DeviceType.Fence:
case "getsensors":
switch (type)
case DeviceType.Vibrate:
case DeviceType.Fence:
#endregion Client Event
#region Send
private int Frame { get; set; } = 0;
private int IncFrame
{ get { return ++Frame % int.MaxValue; } }
private void Send(string topic, object obj)
MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder()
MqttClientPublishResult? pubResult = null;
while (pubResult == null || !pubResult.IsSuccess)
pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result;
catch (Exception)
while (!IsRuning())
if (StartAtom()) break;
private void SendByFrame(string topic, object obj)
//((dynamic)obj).Frame = IncFrame;// no work, transto dict, add frame, transto json
MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder()
MqttClientPublishResult? pubResult = null;
while (pubResult == null || !pubResult.IsSuccess)
pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result;
catch (Exception)
while (!IsRuning())
if (StartAtom()) break;
public void SendAlarm(AlarmMessage msg)
Send(Config.PubAlarmTopic, msg);
public void SendDevices(DeviceType type, List<object> deviceList)
object obj = new
type = type,
data = deviceList,
frame = IncFrame
Send(Config.PubDevicesTopic, obj);
public void SendSensors(DeviceType type, int deviceId, List<object> sensorList)
object obj = new
type = type,
deviceId = deviceId,
data = sensorList,
frame = IncFrame
Send(Config.PubSensorsTopic, obj);
public void SendDeviceState(DeviceType type, object device)
object obj = new
type = type,
data = device,
frame = IncFrame
Send(Config.PubDeviceStateTopic, obj);
public void SendSensorState(DeviceType type, object sensor)
object obj = new
type = type,
data = sensor,
frame = IncFrame
Send(Config.PubSensorStateTopic, obj);
#endregion Send
public enum DeviceType : int
Unknown = 0,
Vibrate = 3,
Fence = 4,