NetMQ 发布订阅模式 Publisher-Subscriber

时间:2023-03-09 16:23:08
NetMQ 发布订阅模式 Publisher-Subscriber

第一部分引用于:点击打开

1:简单介绍

PUB-SUB模式一般处理的都不是系统的关键数据。发布者不关注订阅者是否收到发布的消息,订阅者也不知道自己是否收到了发布者发出的所有消息。你也不知道订阅者何时开始收到消息。类似于广播,收音机。因此逻辑上,它都不是可靠的。这个可以通过与请求响应模型组合来解决。

NetMQ 发布订阅模式 Publisher-Subscriber
图1:简单的发布订阅模式

NetMQ 发布订阅模式 Publisher-Subscriber
图2:与请求响应模式组合的发布订阅模式

2:案例

定义IPublishser接口

namespace NetMQDemoPublisher
{
public interface IPublisher:IDisposable
{
/// <summary>
/// 发布消息
/// </summary>
/// <param name="topicName">主题</param>
/// <param name="data">内容</param>
void Publish(string topicName, string data);
}
}

NetMQ 发布订阅模式 Publisher-SubscriberNetMQ 发布订阅模式 Publisher-Subscriber

Publisher实现类

namespace NetMQDemoPublisher
{
public class Publisher:IPublisher
{
private object _lockObject = new object(); private PublisherSocket _publisherSocket; public Publisher(string endPoint)
{
_publisherSocket = new PublisherSocket();
_publisherSocket.Options.SendHighWatermark = ;
_publisherSocket.Bind(endPoint);
}
#region Implementation of IDisposable /// <summary>
/// 执行与释放或重置非托管资源相关的应用程序定义的任务。
/// </summary>
public void Dispose()
{
lock (_lockObject)
{
_publisherSocket.Close();
_publisherSocket.Dispose();
}
} /// <summary>
/// 发布消息
/// </summary>
/// <param name="topicName">主题</param>
/// <param name="data">内容</param>
public void Publish(string topicName, string data)
{
lock (_lockObject)
{
_publisherSocket.SendMoreFrame(topicName).SendFrame(data);
}
} #endregion
}
}

Publisher窗口界面

NetMQ 发布订阅模式 Publisher-Subscriber

界面中实现的功能代码

namespace NetMQDemoPublisher
{
public partial class PublisherForm : Form
{
private IPublisher publisher;
public PublisherForm()
{
InitializeComponent();
publisher = new Publisher("tcp://127.0.0.1:8888");
} private void button1_Click(object sender, EventArgs e)
{
string strContent = textBox1.Text;
ListViewItem item = new ListViewItem(string.Format("topic:NetMQ,Data:{0}", strContent));
listView1.Items.Add(item);
publisher.Publish("NetMQ", strContent);
}
}
}

定义ISubscriber接口

namespace NetMQDemoSubscriber
{
public interface ISubscriber:IDisposable
{
/// <summary>
/// 事件
/// </summary>
event Action<string, string> Nofity; /// <summary>
/// 注册订阅主题
/// </summary>
/// <param name="topics"></param>
void RegisterSubscriber(List<string> topics); /// <summary>
/// 注册订阅
/// </summary>
void RegisterSbuscriberAll(); /// <summary>
/// 移除所有订阅消息,并关闭
/// </summary>
void RemoveSbuscriberAll();
}
}

Subscriber实现类

namespace NetMQDemoSubscriber
{
public class Subscriber:ISubscriber
{
private SubscriberSocket _subscriberSocket = null;
private string _endpoint = @"tcp://127.0.0.1:9876"; public Subscriber(string endPoint)
{
_subscriberSocket = new SubscriberSocket();
_endpoint = endPoint;
}
#region Implementation of IDisposable /// <summary>
/// 执行与释放或重置非托管资源相关的应用程序定义的任务。
/// </summary>
public void Dispose()
{
throw new NotImplementedException();
} #endregion #region Implementation of ISubscriber public event Action<string, string> Nofity = delegate { }; /// <summary>
/// 注册订阅主题
/// </summary>
/// <param name="topics"></param>
public void RegisterSubscriber(List<string> topics)
{
InnerRegisterSubscriber(topics);
} /// <summary>
/// 注册订阅
/// </summary>
public void RegisterSbuscriberAll()
{
InnerRegisterSubscriber();
} /// <summary>
/// 移除所有订阅消息,并关闭
/// </summary>
public void RemoveSbuscriberAll()
{
InnerStop();
} #endregion #region 内部实现 /// <summary>
/// 注册订阅消息
/// </summary>
/// <param name="topics">订阅的主题</param>
private void InnerRegisterSubscriber(List<string> topics = null)
{
InnerStop();
_subscriberSocket = new SubscriberSocket();
_subscriberSocket.Options.ReceiveHighWatermark = ;
_subscriberSocket.Connect(_endpoint);
if (null == topics)
{
_subscriberSocket.SubscribeToAnyTopic();
}
else
{
topics.ForEach(item => _subscriberSocket.Subscribe(item));
}
Task.Factory.StartNew(() =>
{
while (true)
{
string messageTopicReceived = _subscriberSocket.ReceiveFrameString();
string messageReceived = _subscriberSocket.ReceiveFrameString();
Nofity(messageTopicReceived, messageReceived);
}
});
} /// <summary>
/// 关闭订阅
/// </summary>
private void InnerStop()
{
_subscriberSocket.Close();
} #endregion
}
}

Subscriber窗口界面

NetMQ 发布订阅模式 Publisher-Subscriber

窗体功能代码

namespace NetMQDemoSubscriber
{
public partial class SubscriberForm : Form
{
private ISubscriber subscriber;
public SubscriberForm()
{
InitializeComponent();
} private void SubscriberForm_Load(object sender, EventArgs e)
{
subscriber = new Subscriber("tcp://127.0.0.1:8888");
subscriber.RegisterSbuscriberAll();
subscriber.Nofity+= delegate(string s, string s1)
{
ListViewItem item = new ListViewItem(string.Format("topic:{0},Data:{1}", s, s1));
listView1.Items.Add(item);
};
}
}
}

运行后,Publiser开启一个,Subscirber开启三个,进行测试如图

NetMQ 发布订阅模式 Publisher-Subscriber

源码下载

如果觉得文章好,记得关注一下公众号哟!

NetMQ 发布订阅模式 Publisher-Subscriber