using EC.Util.Common; using MQTTnet; using MQTTnet.Client; using MQTTnet.Exceptions; using MQTTnet.Protocol; using MQTTnet.Server; using NewLife.Serialization; using Newtonsoft.Json.Linq; using System.Text; namespace JiLinApp.Biz.TransmitAlarm; public class AlarmMqttService : IAlarmService { #region Fields private MqttConfig Config { get; } //private MqttServer? Server { get; } private IMqttClient Client { get; } //private MqttServerOptions ServerOptions { get; } private MqttClientOptions ClientOptions { get; } private MqttClientSubscribeOptions SubscribeOptions { get; } public event IAlarmService.HandleRecvEvent? OnFenceUdpSendDevices; public event IAlarmService.HandleRecvEvent? OnVibrateTcpSendDevices; public event IAlarmService.HandleRecvEvent? OnFenceUdpSendSensors; public event IAlarmService.HandleRecvEvent? OnVibrateTcpSendSensors; #endregion Fields public AlarmMqttService(MqttConfig config) { MqttFactory factory = new(); ClientOptions = factory.CreateClientOptionsBuilder() .WithTcpServer(config.Ip, config.Port) .WithClientId(config.ClientId) .WithCredentials(config.UserName, config.Password) .WithCleanSession(false) .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) .Build(); SubscribeOptions = factory.CreateSubscribeOptionsBuilder() .WithTopicFilter(f => f.WithTopic(config.SubCmdTopic)) .Build(); //if (config.Local) //{ // ServerOptions = factory.CreateServerOptionsBuilder() // .WithDefaultEndpoint() // .WithDefaultEndpointPort(config.Port) // .Build(); // MqttServer server = factory.CreateMqttServer(ServerOptions); // server.ValidatingConnectionAsync += Server_ValidatingConnectionAsync; // server.ClientConnectedAsync += Server_ClientConnectedAsync; // server.ClientDisconnectedAsync += Server_ClientDisconnectedAsync; // server.ClientAcknowledgedPublishPacketAsync += Server_ClientAcknowledgedPublishPacketAsync; // Server = server; //} IMqttClient client = factory.CreateMqttClient(); client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync; Config = config; Client = client; } ~AlarmMqttService() { Close(); } #region Base public void Start() { if (IsRuning()) return; int retryTime = Config.RetryTime, retryInterval = Config.RetryInterval; for (int i = 1; i <= retryTime; i++) { MqttClientConnectResult connResult = Client.ConnectAsync(ClientOptions, CancellationToken.None).Result; if (connResult.ResultCode == MqttClientConnectResultCode.Success) break; if (i == retryTime) throw new MqttCommunicationTimedOutException(); Thread.Sleep(retryInterval); } for (int i = 1; i <= retryTime; i++) { MqttClientSubscribeResult subResult = Client.SubscribeAsync(SubscribeOptions, CancellationToken.None).Result; bool flag = true; foreach (var item in subResult.Items) { if (item.ResultCode > MqttClientSubscribeResultCode.GrantedQoS2) { flag = false; break; } } if (flag) break; if (i == retryTime) throw new MqttCommunicationTimedOutException(); Thread.Sleep(retryInterval); } } public void Close() { if (!IsRuning()) return; Client.DisconnectAsync().Wait(); } public bool IsRuning() { return Client != null && Client.IsConnected; } private object StartAtomObj { get; } = new(); private bool StartAtom() { lock (StartAtomObj) { if (!IsRuning()) Start(); } return IsRuning(); } #endregion Base #region Server Event private Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) { if (arg.ClientId.Length == 0) arg.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid; else if (arg.UserName != Config.UserName) arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; else if (arg.Password != Config.Password) arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return Task.CompletedTask; } private Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg) { return Task.CompletedTask; } private Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) { return Task.CompletedTask; } private Task Server_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs arg) { string clientId = arg.ClientId; ushort packetIdentifier = arg.PublishPacket.PacketIdentifier; string topic = arg.PublishPacket.Topic; string msg = Encoding.UTF8.GetString(arg.PublishPacket.Payload); return Task.CompletedTask; } #endregion Server Event #region Client Event private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { string topic = arg.ApplicationMessage.Topic; string msg = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); string qos = arg.ApplicationMessage.QualityOfServiceLevel.ToString(); string retained = arg.ApplicationMessage.Retain.ToString(); if (topic.Equals(Config.SubCmdTopic)) { HandleRecvSubCmdTopic(topic, msg); } return Task.CompletedTask; } private void HandleRecvSubCmdTopic(string topic, string reqJson) { JObject reqObj = JsonUtil.ToJObject(reqJson); string cmd = reqObj.GetValue("cmd", StringComparison.OrdinalIgnoreCase)?.ToString() ?? ""; DeviceType type = (DeviceType)(reqObj.GetValue("type", StringComparison.OrdinalIgnoreCase).ToInt()); switch (cmd.Trim().ToLower()) { case "getdevices": switch (type) { case DeviceType.Vibrate: OnVibrateTcpSendDevices?.Invoke(reqObj); break; case DeviceType.Fence: OnFenceUdpSendDevices?.Invoke(reqObj); break; } break; case "getsensors": switch (type) { case DeviceType.Vibrate: OnVibrateTcpSendSensors?.Invoke(reqObj); break; case DeviceType.Fence: OnFenceUdpSendSensors?.Invoke(reqObj); break; } break; } } #endregion Client Event #region Send private int Frame { get; set; } = 0; private int IncFrame { get { return ++Frame % int.MaxValue; } } private void Send(string topic, object obj) { MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(obj.ToJson()) .Build(); MqttClientPublishResult? pubResult = null; while (pubResult == null || !pubResult.IsSuccess) { try { pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result; } catch (Exception) { while (!IsRuning()) { if (StartAtom()) break; Thread.Sleep(100); } } } } private void SendByFrame(string topic, object obj) { //((dynamic)obj).Frame = IncFrame;// no work, transto dict, add frame, transto json MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(obj.ToJson()) .Build(); MqttClientPublishResult? pubResult = null; while (pubResult == null || !pubResult.IsSuccess) { try { pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result; } catch (Exception) { while (!IsRuning()) { if (StartAtom()) break; Thread.Sleep(100); } } } } public void SendAlarm(AlarmMessage msg) { Send(Config.PubAlarmTopic, msg); } public void SendDevices(DeviceType type, List deviceList) { object obj = new { type = type, data = deviceList, frame = IncFrame }; Send(Config.PubDevicesTopic, obj); } public void SendSensors(DeviceType type, int deviceId, List sensorList) { object obj = new { type = type, deviceId = deviceId, data = sensorList, frame = IncFrame }; Send(Config.PubSensorsTopic, obj); } public void SendDeviceState(DeviceType type, object device) { object obj = new { type = type, data = device, frame = IncFrame }; Send(Config.PubDeviceStateTopic, obj); } public void SendSensorState(DeviceType type, object sensor) { object obj = new { type = type, data = sensor, frame = IncFrame }; Send(Config.PubSensorStateTopic, obj); } #endregion Send } public enum DeviceType : int { Unknown = 0, Vibrate = 3, Fence = 4, }