五.RabbitMQ生产者/消费者消息确认

时间:2024-04-02 19:25:22

01 为什么要消息确认

在一些场合,如转账、付费时每一条消息都必须保证成功的被处理。AMQP是金融级的消息队列协议,有很高的可靠性,这里介绍在使用RabbitMQ时怎么保证消息被成功处理的。

消息确认可以分为两种:一种是生产者发送消息到Broke时,Broker给生产者发送确认回执,用于告诉生产者消息已被成功发送到Broker;

一种是消费者接收到Broker发送的消息时,消费者给Broker发送确认回执,用于通知消息已成功被消费者接收。下边分别介绍生产者端和消费者端的消息确认方法

02 生产者端消息确认

生产者端的消息确认:当生产者将消息发送给Broker,Broker接收到消息给生产者发送确认回执。生产者端的消息确认有两种方式:tx机制和Confirm模式。

1. Tx机制模式

tx机制可以叫事务机制,RabbitMQ中有三个与tx机制的方法:txSelect,txcommit和txRollback。channe.txSelect用于将当前channel设置成一个transaction模式,channe.txCommit提交事务,channel.txRollback回滚事务。

使用tx机制,我们首先要通过txSelect方法开启事务,然后发布消息给broker服务器,如果txCommit提交成功了,则说明消息成功被broker接受了;

如果txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们可以捕获异常,通过txRollback回滚事务。

准备工作:如图

五.RabbitMQ生产者/消费者消息确认

看一个tx机制的简单实现:

五.RabbitMQ生产者/消费者消息确认 执行结果如下:

五.RabbitMQ生产者/消费者消息确认2. Confirm模式

C#的RabbitMQ API中,有三个与Confirm相关的方法:ConfirmSelect,WaitForCnofirms和WaitForCnofirmOrDie。channel.ConfirmSelect表示开启Confirm模式。channel.WaitForConfirms等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false。

channel.WaitForConfirmsOrDie和WaitForConfirms作用类型,也是等待所有消息确认,区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出一个OperationInterrupedException类型异常.

下面看一个简单的案例:

五.RabbitMQ生产者/消费者消息确认执行结果如下:

五.RabbitMQ生产者/消费者消息确认

 

03 消费者端消息确认

生产者端的消息确认:从Broke发送到消费者时,RabbitMQ提供了两种消息确认的方式:自动确认和显示确认。

1、自动确认:

当RabbbitMQ将消息发送给消费者后,消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可;如下内容:channel.BasicConsume(queue:"myqueue",autoAck: true, consumer: consumer);

 

注意:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了。

 2、显示确认

我们知道自动确认可能会出现消息丢失的问题,我们不免会想到:Broker收到回执后才删除消息,如果可以让消费者在接收消息时不立即返回确认回执,等到消息处理完成后(或者完成一部分的逻辑)再返回确认回执,这样就保证消费端不会丢失消息了!这正是显式确认的思路。使用显示确认也比较简单,首先将Resume方法的参数autoAck设置为false在消费端使用代码 channel.BasicAck/BasicReject等方法 来确认和拒绝消息。

 

生产者代码:

五.RabbitMQ生产者/消费者消息确认

 

消费者代码如下:

五.RabbitMQ生产者/消费者消息确认

介绍一下代码中标红的两个方法:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 方法用于确认消息,deliveryTag参数是分发的标记,multiple表示是否确认多条。

 

channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 方法用于拒绝消息,deliveryTag也是指分发的标记,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃

 

尝试运行:如下图

五.RabbitMQ生产者/消费者消息确认

 

五.RabbitMQ生产者/消费者消息确认

一些意外的情况:使用显式确认时,如果消费者处理完消息不发送确认回执,那么消息不会被删除,消息的状态一直是Unacked,这条消息也不会再发送给其他消费者。如果一个消费者在处理消息时尚未发送确认回执的情况下挂掉了,那么消息会被重新放入队列(状态从Unacked变成Ready),有其他消费者存时,消息会发送给其他消费者。