using NetMQ; using NetMQ.Sockets; using System; using System.Collections.Generic; using System.Threading.Tasks; namespace EC.Utils.ZMQ { public class ZMQHelper { public event EventHandler OnSubData; //定义一个委托类型的事件 private bool ServerRuning = false; private string pubServerAddress { get; set; } private string subServerAddress { get; set; } private List _topics { get; set; } private SubscriberSocket Sub_Scriber { get; set; } = null; private PublisherSocket Pub_Socket = null; public ZMQHelper(string pubServer, string subServer, List topics = null) { this.pubServerAddress = pubServer; this.subServerAddress = subServer; if (topics != null) { this._topics = topics; } else { this._topics = new List(); } Sub_Scriber = new SubscriberSocket(); } public void Subscribe(string topic) { this._topics.Add(topic); } public void Start() { ServerRuning = true; Task.Factory.StartNew(() => Pubsub()); } public void Send(string topic, string msg) { if (Pub_Socket == null) { //发布服务 Pub_Socket = new PublisherSocket(); Pub_Socket.Connect(pubServerAddress); } //发布消息 Pub_Socket.SendMoreFrame(topic).SendFrame(msg); } public void Send(string msg) { Send("Topic", msg); } private void Pubsub() { Sub_Scriber.Connect(subServerAddress); foreach (string topic in _topics) { Sub_Scriber.Subscribe(topic); } while (ServerRuning) { if (Sub_Scriber == null) { break; } var topic = Sub_Scriber.ReceiveFrameString(); var msg = Sub_Scriber.ReceiveFrameString(); PubSubModel pubSubModel = new PubSubModel(topic, msg); OnSubData?.Invoke(this, pubSubModel); } } public void Stop() { ServerRuning = false; Pub_Socket?.Close(); Sub_Scriber?.Close(); } } }