You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

104 lines
2.9 KiB

using EC.Util.Common;
using MQTTnet;
using MQTTnet.Client;
using System.Text;
namespace JiLinApp.Biz.TransmitAlarm;
public class AlarmMqttService : IAlarmService
{
#region Fields
private MqttConfig Config { get; }
//private MqttServer Server { get; }
private IMqttClient Client { get; }
private MqttClientOptions ClientOptions { get; }
private MqttClientSubscribeOptions SubscribeOptions { get; }
#endregion Fields
public AlarmMqttService(MqttConfig config)
{
MqttFactory factory = new();
IMqttClient client = factory.CreateMqttClient();
client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
ClientOptions = factory.CreateClientOptionsBuilder()
.WithTcpServer(config.Ip, config.Port)
.WithClientId(config.ClientId)
.WithCredentials(config.UserName, config.Password)
.WithCleanSession(false)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.Build();
SubscribeOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic("mqttnet/samples/topic/1"))
.Build();
Config = config;
Client = client;
}
~AlarmMqttService()
{
Close();
}
#region Base
public void Start()
{
if (IsRuning()) return;
MqttClientConnectResult connResult = Client.ConnectAsync(ClientOptions, CancellationToken.None).Result;
MqttClientSubscribeResult subResult = Client.SubscribeAsync(SubscribeOptions, CancellationToken.None).Result;
}
public void Close()
{
if (!IsRuning()) return;
Client.DisconnectAsync().Wait();
}
public bool IsRuning()
{
return Client != null && Client.IsConnected;
}
public void SendAlarmMessage(AlarmMessage msg)
{
MqttApplicationMessage mqttMsg = new MqttApplicationMessageBuilder()
.WithTopic(Config.PubAlarmTopic)
.WithPayload(JsonUtil.ToJson(msg))
.Build();
MqttClientPublishResult? pubResult = null;
while (pubResult == null || !pubResult.IsSuccess)
{
try
{
pubResult = Client.PublishAsync(mqttMsg, CancellationToken.None).Result;
}
catch (Exception)
{
Start();
Thread.Sleep(100);
}
}
}
#endregion Base
#region Receive
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
string topic = arg.ApplicationMessage.Topic;
string text = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
string qos = arg.ApplicationMessage.QualityOfServiceLevel.ToString();
string retained = arg.ApplicationMessage.Retain.ToString();
return Task.CompletedTask;
}
#endregion Receive
}