RocketMQ 消息失败重试 解析——图解、源码级解析

时间:2022-10-25 08:00:51

???? Java学习:Java从入门到精通总结

???? 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

???? 绝对不一样的职场干货:大厂最佳实践经验指南


???? 最近更新:2022年10月24日

???? 个人简介:通信工程本硕????、Java程序员????。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

???? 点赞 ???? 收藏 ⭐留言 ???? 都是我最大的动力!


异常消息处理

RocketMQ 消息失败重试 解析——图解、源码级解析

使用Consumer时会注册MessageListener,消费消息的接口会返回处理状态:

  • ConsumeConcurrentlyStatus.CONSUME_SUCCESS:消费成功
  • ConsumeConcurrentlyStatus.REConsume_LATER:等待一段时间后再消费

MessageListenerCnsumeMessageConcurrentlyService中被调用的,上面两个状态会映射到CMResult定义的枚举值:

  • CMResult.CR_SUCCESS:消费成功
  • CMResult.CR_LATER:等待一段时间后再消费
  • CMResult.CR_ROLLBACK:事务回滚
  • CMResult.CR_COMMIT:事务提交
  • CMResult.CR_THROW_EXCEPTION:消费异常
  • CMResult.CR_RETURN_NULL:消费结果为null

针对CMResult.CR_LATER状态的处理策略为:将该消息发挥Broker,继续等待后续消息。发送回的消息会设置重试的Topic,命名规则为:“%RETRY%” + Consumer组名,消息原本的Topic会暂存到消息体中,并且会额外设置delayLevelreconsumeTimes

消息消费的结果会在CnsumeMessageConcurrentlyService.processConsumeResult中处理

    public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
		
		// 消息为空,直接返回
        if (consumeRequest.getMsgs().isEmpty())
            return;

		// 计算从consumerequest.msg[0]到consumerequest.msgs[ackIndex]的消息消费成功的数量
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
               	// 统计成功/失败数量
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                // 统计失败数量
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }
		
		// 处理消息失败的消息
        switch (this.defaultMQPushConsumer.getMessageModel()) {
        	// 如果是广播模式,无论是否消费失败,都不回发消息给Broker,只打印Log
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            // 发回失败消息到Broker
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    // 回发给Broker的的具体方法
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                    	// 重复消费次数 + 1
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
				
				// 如果回发给Broker也失败的话,则提交延迟消费请求(稍后在客户端重新消费)
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }
		
		// 移除消费成功消息,并返回消费的最新进度
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        // 更新最新消费进度,进度更新只能增长 
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

Consumer消费的时候可以设置consumeMessageBatchMaxSize来控制传入MessageListener的消息数量。RocketMQ认为只要有一条消息消费失败,这一批消息都会发回给Broker,所以设置consumeMessageBatchMaxSize这个值时应当注意避免出现消息重复消费的问题。



Broker处理流程

Broker端对应的处理逻辑在SendMessageProcessor.consumerSendMsgBack里,对于Consumer发送失败返的消息,Broker会将其放入重试Topic

/**
     * 消费者将消息发回给Broker,可以指定多久后重新消费该消息
     *
     * @param ctx
     * @param request
     * @return
     * @throws RemotingCommandException
     */
    private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
                                                                        RemotingCommand request) throws RemotingCommandException {
        // 初始化响应
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ConsumerSendMsgBackRequestHeader requestHeader =
                (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
        String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
        if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
            ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request);
            this.executeConsumeMessageHookAfter(context);
        }
        SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
            return CompletableFuture.completedFuture(response);
        }

        // 检查Broker是否有写入权限
        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return CompletableFuture.completedFuture(response);
        }

        // 检查重试队列个数是否大于0
        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return CompletableFuture.completedFuture(response);
        }

        // 计算retry Topic = "%RETRY% + consumeGroup"
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());

        // 计算队列编号
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }

        // 获取topicConfig,如果获取不到,则在response里进行相应设置
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return CompletableFuture.completedFuture(response);
        }

        // 不允许写入
        if (!PermName.isWriteable(topicConfig.getPerm())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
            return CompletableFuture.completedFuture(response);
        }

        // 根据消息的commitLog Offset查询实际的MessageExt(消费失败的实际消息)
        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
        if (null == msgExt) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
            return CompletableFuture.completedFuture(response);
        }

        // 设置 PROPERTY_RETRY_TOPIC = 原始消息的Topic,msgInner通过setProperties()方法将原始消息的Properties拷贝过去
        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (null == retryTopic) {
            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
        }
        // 设置消息不等待存储完成
        msgExt.setWaitStoreMsgOK(false);

        int delayLevel = requestHeader.getDelayLevel();

        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        // 3.4.9版本之后可以支持自定义消息的最大消费次数,若未指定,默认为16
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            Integer times = requestHeader.getMaxReconsumeTimes();
            if (times != null) {
                maxReconsumeTimes = times;
            }
        }

        // 如果超过最大消费次数或delayLevel < 0,则加入私信队列
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
                || delayLevel < 0) {
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0);
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return CompletableFuture.completedFuture(response);
            }
        } else {
            if (0 == delayLevel) {
                // 设置延迟级别为重试消费次数 + 3
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }
            msgExt.setDelayTimeLevel(delayLevel);
        }

        // 创建MessageExtBrokerInner,除了Topic、QueueId不同外,其他的都是拷贝原始消息的数据
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        
        // 发送消息
        CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        return putMessageResult.thenApply((r) -> {
            if (r != null) {
                switch (r.getPutMessageStatus()) {
                    case PUT_OK:
                        String backTopic = msgExt.getTopic();
                        String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                        if (correctTopic != null) {
                            backTopic = correctTopic;
                        }
                        this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                        response.setCode(ResponseCode.SUCCESS);
                        response.setRemark(null);
                        return response;
                    default:
                        break;
                }
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(r.getPutMessageStatus().name());
                return response;
            }
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("putMessageResult is null");
            return response;
        });
    }

重试消息的重新投递逻辑与延迟消息一致,等待DelayLevel对应的延时之后,Broker会尝试重新进行消息投递。

关于延迟级别的的配置在MessageStoreConfig.messageDelay里,默认配置如下:

this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

可以主动调整每一个延迟级别对应的时间,但仍然有一些缺陷:

  1. 时间精度不够细,最小粒度是1s
  2. 延迟级别的个数是固定的,无法调整



死信队列

RocketMQ里的消息不能无限次重复消费,当重试次数超过所有延迟级别的个数之后,消息就会进入到死信队列里,死信的Topic命名规则为:"%DLQ% " + Consumer组名

// 如果超过最大消费次数或delayLevel < 0,则加入私信队列
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
                || delayLevel < 0) {
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0);

进入到死信队列后的消息就不会再被投递了,可以通过接口来查询当前RocketMQ中的死信队列的消息。