You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

316 lines
9.0 KiB

using EC.Util.Common;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using MQTTnet.Server;
using NewLife.Serialization;
using Newtonsoft.Json.Linq;
using System.Text;
namespace JiLinApp.Biz.TransmitAlarm;
public class AlarmMqttService : IAlarmService
{
#region Fields
private MqttConfig Config { get; }
//private MqttServer? Server { get; }
private IMqttClient Client { get; }
//private MqttServerOptions ServerOptions { get; }
private MqttClientOptions ClientOptions { get; }
private MqttClientSubscribeOptions SubscribeOptions { get; }
public event IAlarmService.HandleRecvEvent? OnFenceUdpSendDevices;
public event IAlarmService.HandleRecvEvent? OnVibrateTcpSendDevices;
public event IAlarmService.HandleRecvEvent? OnFenceUdpSendSensors;
public event IAlarmService.HandleRecvEvent? OnVibrateTcpSendSensors;
#endregion Fields
public AlarmMqttService(MqttConfig config)
{
MqttFactory factory = new();
ClientOptions = factory.CreateClientOptionsBuilder()
.WithTcpServer(config.Ip, config.Port)
.WithClientId(config.ClientId)
.WithCredentials(config.UserName, config.Password)
.WithCleanSession(false)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.Build();
SubscribeOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(config.SubCmdTopic))
.Build();
IMqttClient client = factory.CreateMqttClient();
client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
Config = config;
Client = client;
}
~AlarmMqttService()
{
Close();
}
#region Base
public void Start()
{
if (IsRuning()) return;
int retryTime = Config.RetryTime, retryInterval = Config.RetryInterval;
for (int i = 1; i <= retryTime; i++)
{
MqttClientConnectResult connResult = Client.ConnectAsync(ClientOptions, CancellationToken.None).Result;
if (connResult.ResultCode == MqttClientConnectResultCode.Success) break;
if (i == retryTime) throw new MqttCommunicationTimedOutException();
Thread.Sleep(retryInterval);
}
for (int i = 1; i <= retryTime; i++)
{
MqttClientSubscribeResult subResult = Client.SubscribeAsync(SubscribeOptions, CancellationToken.None).Result;
bool flag = true;
foreach (var item in subResult.Items)
{
if (item.ResultCode > MqttClientSubscribeResultCode.GrantedQoS2)
{
flag = false;
break;
}
}
if (flag) break;
if (i == retryTime) throw new MqttCommunicationTimedOutException();
Thread.Sleep(retryInterval);
}
}
public void Close()
{
if (!IsRuning()) return;
Client.DisconnectAsync().Wait();
}
public bool IsRuning()
{
return Client != null && Client.IsConnected;
}
private object StartAtomObj { get; } = new();
private bool StartAtom()
{
lock (StartAtomObj)
{
if (!IsRuning()) Start();
}
return IsRuning();
}
#endregion Base
#region Server Event
private Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
if (arg.ClientId.Length == 0) arg.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
else if (arg.UserName != Config.UserName) arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
else if (arg.Password != Config.Password) arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return Task.CompletedTask;
}
private Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
return Task.CompletedTask;
}
private Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
return Task.CompletedTask;
}
private Task Server_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs arg)
{
string clientId = arg.ClientId;
ushort packetIdentifier = arg.PublishPacket.PacketIdentifier;
string topic = arg.PublishPacket.Topic;
string msg = Encoding.UTF8.GetString(arg.PublishPacket.Payload);
return Task.CompletedTask;
}
#endregion Server Event
#region Client Event
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
string topic = arg.ApplicationMessage.Topic;
string msg = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
string qos = arg.ApplicationMessage.QualityOfServiceLevel.ToString();
string retained = arg.ApplicationMessage.Retain.ToString();
if (topic.Equals(Config.SubCmdTopic))
{
HandleRecvSubCmdTopic(topic, msg);
}
return Task.CompletedTask;
}
private void HandleRecvSubCmdTopic(string topic, string reqJson)
{
JObject reqObj = JsonUtil.ToJObject(reqJson);
string cmd = reqObj.GetValue("cmd", StringComparison.OrdinalIgnoreCase)?.ToString() ?? "";
DeviceType type = (DeviceType)(reqObj.GetValue("type", StringComparison.OrdinalIgnoreCase).ToInt());
switch (cmd.Trim().ToLower())
{
case "getdevices":
switch (type)
{
case DeviceType.Vibrate:
OnVibrateTcpSendDevices?.Invoke(reqObj);
break;
case DeviceType.Fence:
OnFenceUdpSendDevices?.Invoke(reqObj);
break;
}
break;
case "getsensors":
switch (type)
{
case DeviceType.Vibrate:
OnVibrateTcpSendSensors?.Invoke(reqObj);
break;
case DeviceType.Fence:
OnFenceUdpSendSensors?.Invoke(reqObj);
break;
}
break;
}
}
#endregion Client Event
#region Send
private int Frame { get; set; } = 0;
private int IncFrame
{ get { return ++Frame % int.MaxValue; } }
private void Send(string topic, object obj)
{
MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(obj.ToJson())
.Build();
MqttClientPublishResult? pubResult = null;
while (pubResult == null || !pubResult.IsSuccess)
{
try
{
pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result;
}
catch (Exception)
{
while (!IsRuning())
{
if (StartAtom()) break;
Thread.Sleep(100);
}
}
}
}
private void SendByFrame(string topic, object obj)
{
//((dynamic)obj).Frame = IncFrame;// no work, transto dict, add frame, transto json
MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(obj.ToJson())
.Build();
MqttClientPublishResult? pubResult = null;
while (pubResult == null || !pubResult.IsSuccess)
{
try
{
pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result;
}
catch (Exception)
{
while (!IsRuning())
{
if (StartAtom()) break;
Thread.Sleep(100);
}
}
}
}
public void SendAlarm(AlarmMessage msg)
{
Send(Config.PubAlarmTopic, msg);
}
public void SendDevices(DeviceType type, List<object> deviceList)
{
object obj = new
{
type = type,
data = deviceList,
frame = IncFrame
};
Send(Config.PubDevicesTopic, obj);
}
public void SendSensors(DeviceType type, int deviceId, List<object> sensorList)
{
object obj = new
{
type = type,
deviceId = deviceId,
data = sensorList,
frame = IncFrame
};
Send(Config.PubSensorsTopic, obj);
}
public void SendDeviceState(DeviceType type, object device)
{
object obj = new
{
type = type,
data = device,
frame = IncFrame
};
Send(Config.PubDeviceStateTopic, obj);
}
public void SendSensorState(DeviceType type, object sensor)
{
object obj = new
{
type = type,
data = sensor,
frame = IncFrame
};
Send(Config.PubSensorStateTopic, obj);
}
#endregion Send
}
public enum DeviceType : int
{
Unknown = 0,
Vibrate = 3,
Fence = 4,
}