RabbitMQ 如何解决消息幂等性的问题

时间:2022-01-09 08:14:41

前言

关于MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费。使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解。

1. RabbitMQ自动重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理?

使用重试机制,RabbitMQ默认开启重试机制。

实现原理:

  • @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务
  • 如果Aop使用异常通知拦截获取到异常后,自动实现补偿机制,消息缓存在RabbitMQ服务器端

注意:

  • 默认会一直重试到消费者不抛异常为止,这样显然不好。我们需要修改重试机制策略,如间隔3s重试一次)

配置:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000

2. 如何合理选择重试机制?

情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试,可能是因为网络原因短暂不能访问

情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试,因为属于程序bug需要重新发布版本

总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job进行健康检查+人工进行补偿

3. 调用第三方接口自动实现补偿机制

我们知道了,RabbitMQ在消费者消费发生异常时,会自动进行补偿机制,所以我们(消费者)在调用第三方接口时,可以根据返回结果判断是否成功:

  • 成功:正常消费
  • 失败:手动抛处一个异常,这时RabbitMQ自动给我们做重试 (补偿)。

4. 如何解决消费者幂等性问题

防止重复消费 (MQ重试机制需要注意的问题)

产生原因:网络延迟传输中,消费者出现异常或者消费者延迟消费,会造成进行MQ重试补偿,在重试过程中,可能会造成重复消费。

面试题:MQ中消费者如何保证幂等性问题,不被重复消费?

RabbitMQ 如何解决消息幂等性的问题

伪代码:

生产者核心代码:

请求头设置消息id(messageId)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class FanoutProducer {
 @Autowired
 private AmqpTemplate amqpTemplate;
 
 public void send(String queueName) {
  String msg = "my_fanout_msg:" + System.currentTimeMillis();
  //请求头设置消息id(messageId)
  Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
    .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
  System.out.println(msg + ":" + msg);
  amqpTemplate.convertAndSend(queueName, message);
 }
}

消费者核心代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RabbitListener(queues = "fanout_email_queue")
 public void process(Message message) throws Exception {
  // 获取消息Id
  String messageId = message.getMessageProperties().getMessageId();
  String msg = new String(message.getBody(), "UTF-8");
  //② 判断唯一Id是否被消费,消息消费成功后将id和状态保存在日志表中,我们从(①步骤)表中获取并判断messageId的状态即可
  //从redis中获取messageId的value
  String value = redisUtils.get(messageId)+"";
  if(value.equals("1") ){ //表示已经消费
   return; //结束
  }
  System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
  JSONObject jsonObject = JSONObject.parseObject(msg);
  // 获取email参数
  String email = jsonObject.getString("email");
  // 请求地址
  String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
  JSONObject result = HttpClientUtils.httpGet(emailUrl);
  if (result == null) {
   // 因为网络原因,造成无法访问,继续重试
   throw new Exception("调用接口失败!");
  }
  System.out.println("执行结束....");
  //① 执行到这里已经消费成功,我们可以修改messageId的状态,并存入日志表(可以存到redis中,key为消息Id、value为状态)
 }

5. SpringBoot整合RabbitMQ应答模式(ACK)

1.修改配置simple下添加 acknowledge-mode: manual:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
        # 开启手动ack
        acknowledge-mode: manual

2.消费者增加代码:

?
1
2
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手动ack
channel.basicAck(deliveryTag, false);手动签收
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//邮件队列
@Component
public class FanoutEamilConsumer {
 @RabbitListener(queues = "fanout_email_queue")
 public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
  System.out
    .println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")
      + ",messageId:" + message.getMessageProperties().getMessageId());
  // 手动ack
  Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  // 手动签收
  channel.basicAck(deliveryTag, false);
 }
}

RabbitMQ 如何保证幂等性,数据一致性

mq的作用主要是用来解耦,削峰,异步,

增加MQ,系统的复杂性也会增加很多,

也会带来其他的问题,比如MQ挂了怎么办,怎么保持数据的幂等性

幂等性问题通俗点讲就是保证数据不被重复消费,同时数据也不能少,

也就是数据一致性问题。

下面是MQ丢失的3种情况

RabbitMQ 如何解决消息幂等性的问题

1,生产者发送消息至MQ的数据丢失

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,

然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了

2,MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

3,消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

RabbitMQ 如何解决消息幂等性的问题

数据重复的问题简单的多,就是在消费端判断数据是否已经被消费过

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/qq_38252039/article/details/91409955