RabbitMQ消息队列(五):Routing 消息路由 2[原]

时间:2023-02-14 12:49:00

上一篇文章使用的是Direct的Exchange,但是没有指定Queue的名字,这样只能是先运行Consumer之后,Producer在运行发消息Consumer才能收到,否则先运行Producer发送消息,在运行Consumer是收不到之前Producer发送的消息,因为Queue的名字像是这样的:amq.gen-X-XSTaseUmil42zrawBVsw都是临时,如果Consumer关闭之后,这个Queue就会自动被RabbitMQ删掉。

如果想创建可以先执行Producer的Direct的Exchnage呢?因为在实际工作中我们可能需要发送端有消息就会一直发给接收端,不管接收端是否已经运行。如果我们需要指定名称的Queue,并且使用Direct的Exchange方式,我们需要使用Binding的方式。上一篇和第一篇文章中都解释了绑定的含义:绑定其实就是关联了exchange和queue。

多个routing key指定同一个queue,不管如何指定routing key的名字,发送端发送一次信息,接收端按启动顺序循环执行接收,每次接收一个消息。例子:

Producer.cs

 /// <summary>
/// 多个routing key指定同一个queue
/// 指定Queue的名称,好处就是可以持久化Queue
/// </summary>
/// <param name="args">
/// SendDemo51.exe direct_custom_routing_key_hello1
/// SendDemo51.exe direct_custom_routing_key_hello2
/// 不管如何指定routing key的名字,发送端发送一次信息,接收端按启动顺序循环执行每次接收一个消息。
/// </param>
static void Main(string[] args)
{
if (args.Length < )
{
Console.Error.WriteLine("请指定一个新的Routing Key名称", Environment.GetCommandLineArgs()[]);
Environment.ExitCode = ;
return;
}
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
const string EXCHANGE_NAME = "direct_logs";
channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。 const string QUEUE_NAME = "direct_same_queue_name_hello";//使用我们自己指定Queue的名称
bool durable = true;
channel.QueueDeclare(QUEUE_NAME, durable, false, false, null); var routingKey = args[];//指定Routing Key的名称
channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称、不同的RoutingKey名称使用Direct的Exchange方式进行关联 var message = "Hello World! " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties channel.BasicPublish(EXCHANGE_NAME, routingKey, properties, body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); Console.Read();
}
}
}

Producer.cs

Consumer.cs

 /// <summary>
/// 多个routing key指定同一个queue
/// 指定Queue的名称,好处就是可以持久化Queue
/// ReceiveDemo51.exe direct_custom_routing_key_hello1
/// ReceiveDemo51.exe direct_custom_routing_key_hello2
/// 不管如何指定routing key的名字,发送端发送一次信息,接收端按启动顺序循环执行每次接收一个消息。
/// </summary>
class Program
{
static void Main(string[] args)
{
if (args.Length < )
{
Console.Error.WriteLine("请指定一个新的Routing Key名称", Environment.GetCommandLineArgs()[]);
Environment.ExitCode = ;
return;
}
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
const string EXCHANGE_NAME = "direct_logs";
channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); const string QUEUE_NAME = "direct_same_queue_name_hello";//使用我们自己指定Queue的名称
bool durable = true;
channel.QueueDeclare(QUEUE_NAME, durable, false, false, null); string routingKey = args[];//指定Routing Key的名称
channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称、不同的RoutingKey名称使用Direct的Exchange方式进行关联 Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C"); var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QUEUE_NAME, true, consumer); while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingkey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingkey, message);
}
}
}
}
}

Consumer.cs

多个Queue关联同一个routing key,多个queue通过routing key可以同时得到发送的内容,例子如下:

Producer.cs

 /// <summary>
/// 多个Queue关联一个routing key,通过routingkey可以拿到多个queue里面的内容;
/// 如果得到消息的Queue不是指定名称的Queue(此时这个例子是通过routing key得到消息),那么它是不会自动从Queue中删除接收到的消息,
/// 只有是指定名称的Queue收到消息之后才会把Queue中的消息删除。
/// SendDemo52.exe direct_custom_queue_name_hello1
/// SendDemo52.exe direct_custom_queue_name_hello2
/// 发送端发送一次信息,多个接收端同时接收到消息
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
if (args.Length < )
{
Console.Error.WriteLine("请指定一个新的Queue名称", Environment.GetCommandLineArgs()[]);
Environment.ExitCode = ;
return;
}
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
const string EXCHANGE_NAME = "direct_logs";
channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。 string queueName = args[];//得到我们自己指定Queue的名称
channel.QueueDeclare(queueName, true, false, false, null); const string ROUTING_KEY = "direct_same_routing_key";
channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//通过绑定将不同的Queue名称、相同的Routing Key名称采用Direct的Exchange方式进行关联 var message = "Hello World! " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties channel.BasicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", ROUTING_KEY, message); Console.Read();
}
}
}

Producer.cs

Consumer.cs

 /// <summary>
/// 多个Queue关联一个routing key,通过routingkey可以拿到多个queue里面的内容;
/// 如果得到消息的Queue不是指定名称的Queue(此时这个例子是通过routing key得到消息),那么它是不会自动从Queue中删除接收到的消息,
/// 只有是指定名称的Queue收到消息之后才会把Queue中的消息删除。
/// ReceiveDemo52.exe direct_custom_queue_name_hello1
/// ReceiveDemo52.exe direct_custom_queue_name_hello2
/// 发送端发送一次信息,多个接收端同时接收到消息
/// </summary>
class Program
{
static void Main(string[] args)
{
if (args.Length < )
{
Console.Error.WriteLine("请指定一个新的Queue名称", Environment.GetCommandLineArgs()[]);
Environment.ExitCode = ;
return;
}
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
const string EXCHANGE_NAME = "direct_logs";
channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); string queueName = args[];//得到我们自己指定Queue的名称
channel.QueueDeclare(queueName, true, false, false, null); const string ROUTING_KEY = "direct_same_routing_key";
channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//通过绑定将不同的Queue名称、相同的Routing Key名称采用Direct的Exchange方式进行关联 Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C"); var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer); while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
string routingKeyRe = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKeyRe, message);
}
}
}
}
}

Consumer.cs

特别注意:

本例子是支持多个Queue名称同时发送和接收的,Consumer通过routing key可以拿到属于这个routing key里面的多个queue的内容;
如果Consumer一旦接收到消息,分两种情况:

1、接收的Consumer不是指定名称的Queue(此时这个Consumer是通过routing key得到Producer发送的消息),那么这个Consumer是不会自动从Queue中删除接收到的消息;

2、接收的Consumer是指定名称的Queue,那么这个Consumer是会自动从Queue中删除接收到的消息。

总结:只有当指定名称的Queue收到消息之后才会把Queue中的这条消息删除。

RabbitMQ消息队列(五):Routing 消息路由 2[原]的更多相关文章

  1. 消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?

    每个时代,都不会亏待会学习的人. 大家好,我是 yes. 今天我们来谈一谈消息队列的事务消息,一说起事务相信大家都不陌生,脑海里蹦出来的就是 ACID. 通常我们理解的事务就是为了一些更新操作要么都成 ...

  2. 几种MQ消息队列对比与消息队列之间的通信问题

    消息队列 开发语言 协议支持 设计模式 持久化支持 事务支持 负载均衡支持 功能特点 缺点 RabbitMQ Erlang AMQP,XMPP,SMTP,STOMP 代理(Broker)模式(消息在发 ...

  3. 译&colon;4&period;RabbitMQ Java Client 之 Routing(路由)

    在上篇博文 译:3.RabbitMQ 之Publish/Subscribe(发布和订阅)  我们构建了一个简单的日志系统 我们能够向许多接收者广播日志消息. 在本篇博文中,我们将为其添加一个功能 - ...

  4. 分布式消息队列RocketMQ--事务消息--解决分布式事务

    说到分布式事务,就会谈到那个经典的”账号转账”问题:2个账号,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性? 一般的思路都是通过消息中间件来实现“最终一致性” ...

  5. Java使用Rabbitmq惊喜队列queue和消息内容的本地持久化核心方法。(内容存储在硬盘)

    _Channel.queueDeclare(queue, true, false, false, null); _Channel.basicPublish(_ExchangeName, queue,M ...

  6. 分布式消息队列RocketMQ&amp&semi;Kafka -- 消息的&OpenCurlyDoubleQuote;顺序消费”

    在说到消息中间件的时候,我们通常都会谈到一个特性:消息的顺序消费问题.这个问题看起来很简单:Producer发送消息1, 2, 3... Consumer按1, 2, 3...顺序消费. 但实际情况却 ...

  7. C&num;消息队列&lpar;RabbitMQ&rpar;零基础从入门到实战演练

    一.课程介绍 如果您从工作中之听过但未有接触过消息对队列(MQ),如果你接触过一点关于MQ的知识,如果没有这么的多如果的话......,那么阿笨将通过本次<C#消息队列零基础从入门到实战演练&g ...

  8. RabbitMQ消息队列应用

    RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是 ...

  9. RabbitMQ 消息队列 二

    一:查看MQ的用户角色 rabbitmqctl list_users 二:添加新的角色,并授予权限 rabbitmqctl add_user xiaoyao 123456 rabbitmqctl se ...

  10. RabbitMQ&comma;Apache的ActiveMQ&comma;阿里RocketMQ&comma;Kafka&comma;ZeroMQ&comma;MetaMQ&comma;Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

    消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以 ...

随机推荐

  1. HTML表单提交

    <!DOCTYPE html> <!--当前文档为html5--> <html> <head> <!--设置文档编码--> <meta ...

  2. 搭建一个分布式MongoDB鉴权集群

    今天休假在家,测试并搭建了一个replica set shard MongoDB鉴权集群.replica set shard 鉴权集群中文资料比较少,本文是个人笔记,同时也希望对后来者有所帮助.本文仅 ...

  3. apache配置Options详解

    http://www.365mini.com/page/apache-options-directive.htm Options指令是Apache配置文件中一个比较常见也比较重要的指令,Options ...

  4. 简单几步让Chrome浏览器也能打开Oracle EBS

    2016-12-14更新: Google Chrome浏览器从版本45开始正式禁用NPAPI插件(也就是原本JRE插件的实现架构).所以如果你的浏览器版本已经是45以上了,本文提供的方法将不再适用.以 ...

  5. bq24075 锂电池 充电电路分析

     bq24075 锂电池 充电电路分析 本文主要是分析bq24075锂电池充电芯片电路,知道其大致是怎么工作的,其中的一些电阻该如何配置. -- 深圳 南山平山村 曾剑锋 一.参考文章: . NTC热 ...

  6. nopcommerce 开源商城

    http://www.nopchina.net/  中文网 http://www.nopcommerce.com/downloads.aspx  源码下载  如果要在数据库中添加一个新的数据表,需要按 ...

  7. &lbrack;struts2学习笔记&rsqb; 第二节 使用Maven搞定管理和构造Struts 2 Web应用程序的七个步骤

    本文地址:http://blog.csdn.net/sushengmiyan/article/details/40303897 官方文档:http://struts.apache.org/releas ...

  8. Django 信号signal

    序言 Django自带一套信号机制来帮助我们在框架的不同应用位置之间传递信息.也就是说,当某一事件发生时,信号系统可以允许一个或多个发送者(senders)将信号(signals)发送给一组接收者(r ...

  9. canvas绘制图片

    canvas保存为data:image扩展功能的实现 [已知]canvas提供了toDataURL的接口,可以方便的将canvas画布转化成base64编码的image.目前支持的最好的是png格式, ...

  10. mongo文本搜索的一个例子

      假如有一个名为articles的集合,数据如下: { "_id" : 1, "title" : "cakes and ale" } { ...