【ZeroMQ】消息模式

时间:2023-03-09 19:10:12
【ZeroMQ】消息模式

1、请求/应答模式(REP/REQ)

该模式特征:

  • 服务器使用REP类型套接字而客户端使用REQ类型套接字
  • 客户端发送请求和接收答复,而服务器则接收请求并发送答复
  • 客户端可以连接到一个或多个服务器。在这种情况下,请求会在所有的服务器(Reps)之间循环,一个请求被发送到某个服务器,下一个请求则被发送到下个服务器,如此进行下去。
  • 基于状态的模式:客户端在发送另一个请求之前,必须先接收前一个请求的答复。而服务器在接收另一个请求之前,必须答复前一个请求。

【ZeroMQ】消息模式

 //服务器端代码
private static void Main(string[] args)
{
using (var context = ZContext.Create())
{
using (var resp = new ZSocket(context, ZSocketType.REP))
{
resp.Bind("tcp://*:5555");
while (true)
{
ZFrame reply = resp.ReceiveFrame();
string message = reply.ReadString(); Console.WriteLine("Receive message {0}", message); resp.Send(new ZFrame(message)); if (message == "exit")
{
break;
}
}
}
}
}

服务器端代码

 //客户端代码
static void Main(string[] args)
{
using (var context = new ZContext())
using (var requester = new ZSocket(context, ZSocketType.REQ))
{
// Connect
requester.Connect("tcp://127.0.0.1:5555"); while(true)
{
Console.WriteLine("Please enter your message:");
string message = Console.ReadLine();
requester.Send(new ZFrame(message)); // Send
//requester.Send(new ZFrame(requestText)); // Receive
using (ZFrame reply = requester.ReceiveFrame())
{
Console.WriteLine("Received: {0} {1}!", message, reply.ReadString());
if ("exit" == reply.ReadString())
{
break;
}
}
}
}
}

客户端代码

2、发布/订阅模式Publish/Subscribe(PUB/SUB)

该模式具有一下特征:

  • 发布者使用PUB类型套接字和订阅者则使用SUB类型套接字
  • 一个发布者可以有一个或者多个订阅者
  • 一个订阅者可以连接到一个或者多个发布者
  • 发布者发送消息而订阅者接收消息
  • 订阅者可以使用SubscribeAll方法订阅所有的发布者消息,可以使用Subscrube方法订阅某个特定的消息,这时要将所感兴趣的发布者的消息前缀作为参数。对消息的过滤发生在订阅者端,即发布者将其所有的消息发送给订阅者,而订阅者负责将不需要的消息丢弃。
  • 订阅者可以用UnsubscribeAll方法取消所有订阅,也可以使用Unsubscribe方法加上消息前缀来退订某个发布者。
  • 发布者将消息发送到已连接的所有订阅者。
  • 如果发布者没有和任何订阅者连接,那么消息将会被丢弃。
  • 如订阅者连接到多个发布者,那么它会均匀的接收所有发布者的消息(公平队列)。

【ZeroMQ】消息模式

 //发布者
static void Publisher()
{
using (var context = new ZContext())
using (var publisher = new ZSocket(context, ZSocketType.PUB))
{
publisher.Bind("tcp://127.0.0.1:5555");
Random random = new Random();
while (true)
{
string message = string.Format("Random: {0}", random.Next());
publisher.Send(new ZFrame(message));
Console.WriteLine("Send:{0}", message);
Thread.Sleep();
}
}
}

发布者

 //订阅者
static void Subscribe()
{
using (var context = new ZContext())
using (var subscribe = new ZSocket(context, ZSocketType.SUB))
{
subscribe.SubscribeAll();
subscribe.Connect("tcp://127.0.0.1:5555");
while (true)
{ using (ZFrame replay = subscribe.ReceiveFrame())
{
Console.WriteLine("REceive: {0}", replay.ReadString());
}
} }
}

订阅者

3、管道模式(Push/Pull)

当需要进行并行数据处理时,通常会用到该模式。管道模式使用场景如下所示:

  • 1.首先任务分发器通常以循环方式将消息(任务)推送给工作单元(每个工作单元有不同任务)
  • 2.接收到消息时工作单元会先对其进行处理,然后将它推送给接收消息(任务)的某种任务收集器。
  • 3.收集器以公平排队的方法从期连接的所有工作单元那接收消息。

该模式具有一下特征:

  • 任务分发器使用PUSH类型的套接字。它绑定到端点,并等待工作单元的连接。
  • 工作单元有两个套接字,一个使用PULL类型连接到任务分发器,另一个则是PUSH类型,负责和收集器的连接。
  • 任务收集器使用PULL类型套接字。它绑定到端点,并等待接收工作单元的连接。
  • 【ZeroMQ】消息模式