MQ高可用相关设置

时间:2024-03-14 12:23:03

文章目录

  • 前言
  • MQ如何保证消息不丢失
    • RabbitMQ
    • RocketMQ
    • KafkaMQ
  • MQ如何保证顺序消息
    • RabbitMQ
    • RocketMQ
    • Kafka
  • MQ刷盘机制/集群同步
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 广播消息&集群消息
    • RabbitMQ
    • RocketMQ
  • MQ集群架构
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 消息重试
    • RabbitMQ
    • RockeMq
    • Kafka
  • 死信队列
    • RocketMQ
    • Kafka
  • 消息去重
    • RocketMQ
    • Kafka
  • 事务消息
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 消息积压
    • RabbitMQ
  • 设计MQ思路
  • 博客记录

前言

消息丢失的三种情况

  1. 消息在传入服务过程中丢失
  2. MQ收到消息,暂存内存中,还没消费,自己挂掉了,内存中的数据搞丢
  3. 消费者消费到了这个消息,但还没来得及处理,就挂了,MQ以为消息已经被处理

也就是生产者丢失消息、消息列表丢失消息、消费者丢失消息;

MQ如何保证消息不丢失

RabbitMQ

一、生产者

开启RabbitMQ事务
生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

// 开启事务
channel.txSelect
try {
      // 这里发送消息
} catch (Exception e) {
      channel.txRollback
 
// 这里再次重发这条消息
 
}
 
// 提交事务
channel.txCommit

设置Confirm模式:

同步确认:

//开启发布确认
channel.confirmSelect();
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if(flag){
    System.out.println("消息发送成功");
}

异步确认 :

服务端 :

消息持久化,必须满足以下三个条件,缺一不可。

  • Exchange 设置持久化

  • Queue 设置持久化

  • Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。

// 声明队列,并将队列设置为持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello, RabbitMQ!";
// 发送消息时将消息设置为持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)
        .build();
channel.basicPublish("", "myQueue", properties, "Hello, RabbitMQ".getBytes());

设置备份交换机:

Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "myAlternateExchange");
channel.exchangeDeclare("myExchange", BuiltinExchangeType.DIRECT, true, false, arguments);
channel.exchangeDeclare("myAlternateExchange", BuiltinExchangeType.FANOUT, true, false, null);

二 :服务端设置集群镜像模式

  • 单节点模式: 最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

  • 普通模式: 消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

  • 镜像模式: 消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

消费方开启消息确认机制 :

// 开启消息确认机制
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);
        // 手动发送消息确认
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

手动确认 :

codechannel.basicConsume("myQueue", false, (consumerTag, delivery) -> {
    // 处理消息
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});

RocketMQ

一、生产者提供SYNC的发送消息方式,等待broker处理结果。

RocketMQ生产者提供了3种发送消息方式,分别是:

同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应发送结果。

Message msg = new Message("TopicTest",
        "TagA","OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步传递消息,消息会发给集群中的⼀个Broker节点。
SendResult sendResult = producer.send(msg);

异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

Message msg = new Message("TopicTest","TagA","OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
        
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        countDownLatch.countDown();
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable e) {
        countDownLatch.countDown();
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});

Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer只负责把请求发出去,而不处理响应结果。

Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
//核⼼:发送消息。没有返回值,发完消息就不管了,不知道有没有发送消息成功
producer.sendOneway(msg);

SendResult定义说明(来自RocketMQ官方)

  • SEND_OK
    消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
  • FLUSH_DISK_TIMEOUT
    消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
  • FLUSH_SLAVE_TIMEOUT
    消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。
  • SLAVE_NOT_AVAILABLE
    消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即- SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。

我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式(但是效率比较低)。

二、Borker 方面 : 设置成同步刷盘及同步复制;开启集群模式,集群同步;

1)异步刷盘

默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。

2)同步刷盘

消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:

flushDiskType=SYNC_FLUSH

同步复制后,消息复制流程如下:

  • slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;

  • master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;

  • slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;

  • master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

三、消费者

RocketMQ消费失败后的消费重试机制

手动提交消息偏移量

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
public enum ConsumeConcurrentlyStatus {
    //业务方消费成功
    CONSUME_SUCCESS,
    //业务方消费失败,之后进行重新尝试消费
    RECONSUME_LATER;
}

RECONSUME_LATER “%RETRY%+ConsumeGroupName”—重试队列的主题

KafkaMQ

解决方案:
1、生产者调用异步回调消息。伪代码如下: producer.send(msg,callback);
2、生产者增加消息确认机制,设置生产者参数:acks = all。partition的leader副本接收到消息,等待所有的follower副本都同步到了消息之后,才认为本次生产者发送消息成功了;
3、生产者设置重试次数。比如:retries>=3,增加重试次数以保证消息的不丢失;
4、定义本地消息日志表,定时任务扫描这个表自动补偿,做好监控告警。
5、后台提供一个补偿消息的工具,可以手工补偿。


生产者设置同步发送 :

 // 异步发送 默认
 kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
 // 同步发送
 RecordMetadata first = kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get(); 

生产者设置发送ack :

// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
        // 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));

生产者设置重试次数。比如:retries>=3,增加重试次数以保证消息的不丢失;

三、消费者:

通过在Consumer端设置“enable.auto.commit”属性为false后,
在代码中手动调用KafkaConsumer实例的commitSync()方法提交,

这里指的是同步阻塞commit消费的偏移量,等待Broker端的返回响应,需要注意Broker端在对commit请求做出响应之前,消费端会处于阻塞状态,从而限制消息的处理性能和整体吞吐量以确保消息能够正常被消费。

如果在消费过程中,消费端突然Crash,这时候消费偏移量没有commit,等正常恢复后依然还会处理刚刚未commit的消息。

生产者acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。

1)ack=0,生产者在成功写入悄息之前不会等待任何来自服务器的响应。

2)ack=1,只要集群的首领副本收到消息,生产者就会收到一个来自服务器的成功响应。

3)ack=all,只有当所有同步副本全部收到消息时,生产者才会收到一个来自 服务器的成功响应。

MQ如何保证顺序消息

严格顺序消费的注意事项

  • 生产者不能异步发送,异步发送在发送失败的情况下,就没办法保证消息顺序。

比如你连续发了1,2,3。 过了一会,返回结果1失败,2, 3成功。你把1再重新发送1遍,这个时候顺序就乱掉了。

  • 应用中应确保业务中添加事务锁,防止并发处理同一对象。

比如修改业务员的手机号,操作员A和操作员B同时修改业务员张三的手机号,如两人的填入手机号相同无影响,如不同,操作员A输入正确,操作员B输入错误,可能造成消费顺序乱掉,手机号修改错误。

  • 对于消费端,不能并行消费,生产者顺序发送,消费端必须顺序消费。

RabbitMQ

消息单个消费者单线程消费。

RocketMQ

RocketMQ 提供了两种顺序消息模式:全局顺序消息和分区顺序消息。

  • 全局顺序消息:适用于性能要求不高的场景,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。全局顺序消息实际上是一种特殊的分区顺序消息,即Topic中只有一个分区,因此全局顺序和分区顺序的实现原理相同。由于分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
  • 分区顺序消息:适用于性能要求高的场景,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。分区顺序消息比全局顺序消息的并发度和性能更高。

发送方使用MessageQueueSelector选择队列 :

Message msg = new Message("OrderTopicTest", "order_"+orderId,
        "KEY" + orderId,("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息队列的选择器
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    //第一个参数:所有的消息,第二个参数:发送的消息,第三个参数:根据什么发送,这里面传的是orderId
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
    //同一个订单id可以放到同一个队列里面去
}, orderId);

消费方使用MessageQueueSelector选择队列 :

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        //自动提交
        context.setAutoCommit(true);
        for(MessageExt msg:msgs){
            System.out.println("收到消息内容 "+new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

Kafka

Kafka是分布式多partition的,它会将一个topic中的消息尽可能均匀的分发到每个partition上。那么问题就来了,这样怎么保证同一个topic消息的顺序呢?

kafka可以通过partitionKey,将某类消息写入同一个partition,一个partition只能对应一个消费线程,以保证数据有序。
也就是说生产者在写消息的时候,可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。

先后两条消息发送时,前一条消息发送失败,后一条消息发送成功,然后失败的消息重试后发送成功,造成乱序。为了解决重试机制引起的消息乱序为实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。

对于每个PID,该Producer发送消息的每个<Topic, Partition>都对应一个单调递增的Sequence Number。
同样,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每Commit一条消息时将其对应序号递增。
对于接收的每条消息,如果其序号比Broker维护的序号大一,则Broker会接受它,否则将其丢弃
如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息
如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息
发送失败后会重试,这样可以保证每个消息都被发送到broker

消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。

指定发送partition的分区:

//没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,
//kafkaProducer.send(new ProducerRecord<>("first","a","atguigu " + i), new Callback() {}
kafkaProducer.send(new ProducerRecord("first", 0, "", "atguigu" + i)
        , new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e == null) {
                    System.out.println(" 主题: " +
                            metadata.topic() + "->" + "分区:" + metadata.partition()
                    );
                } else {
                    e.printStackTrace();
                }
            }
        });

MQ刷盘机制/集群同步

RabbitMQ

RocketMQ

同步刷盘、异步刷盘
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。

RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:

1)异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,立刻返回发送端发送成功,有单独的线程执行刷盘;写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入。

2)同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回告知发送端。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的一个

消息存储时,先将消息存储到内存,再根据不同的刷盘策略进行刷盘

同步刷盘:

异步刷盘:

刷盘源码

同步复制、异步复制
如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。

  • 同步复制是等Master和Slave均写成功后才反馈给客户端写成功状态;
  • 异步复制方式是只要Master写成功即可反馈给客户端写成功状态

这两种复制方式各有优劣:

  • 在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;
  • 在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是SYNC_FLUSH方式,由于频繁的触发写磁盘动作,会明显降低性能。

通常情况下,应该把Master和Slave设置成ASYNC_FLUSH的刷盘方式,

主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然可以保证数据不丢。

Kafka

Broker针对每个分区会创建一个分区目录,分区目录下面存放的是日志文件(.log)和索引文件(.index)

Kafka的刷盘策略主要有两种:同步刷盘(sync flush)和异步刷盘(async flush)。

同步异步刷盘的区别在于,消息存储在内存(memory)中以后,是否会等待执行完刷盘动作再返回,即是否会等待将消息中的消息写入磁盘中。kafka可以通过配置flush.messageflush.ms来设置刷盘策略,如果flush.message设置为5,表示每5条消息进行一次刷盘,如果flush.message设置为1,表示每一条消息都进行一次刷盘。如果flush.ms设置为1000,表示每过1000ms进行一次刷盘,如果flush.ms设置为5000,表示每过5000ms进行一次刷盘。

  • 同步刷盘:每条消息被写入磁盘前,必须等待操作系统完成该消息的磁盘写入操作。这种方式可以确保数据不丢失,但由于每次消息都要等待磁盘I/O完成,因此会影响性能。在Kafka中,默认使用的是异步刷盘策略,因为它结合了多副本和基于日志的存储机制,通过复制和重放来保障数据的高可用性。异步刷盘的目的是为了提高吞吐量和适应高性能应用场景。不过,这种方法增加了数据丢失的风险,尤其是在系统发生故障的情况下。
  • 异步刷盘:这是一种更轻量级的刷盘方式,它允许消息先被写入内存中的页缓存,然后在空闲时由操作系统异步地刷入磁盘。这样可以减少对性能的影响,尤其是当处理大量消息时。然而,由于没有立即将数据刷入磁盘,所以存在一定的数据丢失风险。Kafka提供了配置项log.flush.interval.messageslog.flush.interval.ms来控制何时触发强制的刷盘操作。如果没有设置这些参数,那么Kafka会根据log.flush.scheduler.interval.ms(默认值为3000毫秒)的时间间隔进行检查,以确定是否需要刷新所有日志到磁盘。需要注意的是,尽管可以通过这些参数来实现一定程度的控制,但是官方并不推荐依赖它们来强制刷盘,而是强调通过副本机制来保证数据的一致性和可靠性。

广播消息&集群消息

RabbitMQ

RocketMQ

RocketMQ主要提供了两种消费模式:集群消费以及广播消费。我们只需要在定义消费者的时候通过setMessageModel(MessageModel.XXX)方法就可以指定是集群还是广播式消费,默认是集群消费模式,即每个Consumer Group中的Consumer均摊所有的消息。

public class MQProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 创建DefaultMQProducer类并设定生产者名称
        DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
        // 设置NameServer地址,如果是集群的话,使用分号;分隔开
        mqProducer.setNamesrvAddr("10.0.91.71:9876");
        // 消息最大长度 默认4M
        mqProducer.setMaxMessageSize(4096);
        // 发送消息超时时间,默认3000
        mqProducer.setSendMsgTimeout(3000);
        // 发送消息失败重试次数,默认2
        mqProducer.setRetryTimesWhenSendAsyncFailed(2);
        // 启动消息生产者
        mqProducer.start();
        // 循环十次,发送十条消息
        for (int i = 1; i <= 10; i++) {
            String msg = "hello, 这是第" + i + "条同步消息";
            // 创建消息,并指定Topic(主题),Tag(标签)和消息内容
            Message message = new Message("CLUSTERING_TOPIC", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送同步消息到一个Broker,可以通过sendResult返回消息是否成功送达
            SendResult sendResult = mqProducer.send(message);
            System.out.println(sendResult);
        }
        // 如果不再发送消息,关闭Producer实例
        mqProducer.shutdown();
    }
}
public class MQConsumerB