RocketMQ原理解析-Consumer

时间:2021-06-27 04:33:40

consumer 1.启动

有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息。

Consumer消费拉取的消息的方式有两种

1.      Push方式:rocketmq已经提供了很全面的实现,consumer通过长轮询拉取消息后回调MessageListener接口实现完成消费,应用系统只要MessageListener完成业务逻辑即可

2.      Pull方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统去根据自己的业务需求去实现

下面介绍默认以push方式为主,因为绝大多数是由push消费方式来使用rocketmq的。

consumer启动流程

指定group

订阅topic

注册消息监听处理器,当消息到来时消费消息

消费端Start

         复制订阅关系

         初始化rebalance变量

         构建offsetStore消费进度存储对象

         启动消费消息服务

         向mqClientFactory注册本消费者

         启动client端远程通信

         启动定时任务

                   定时获取nameserver地址

                   定时从nameserver获取topic路由信息

                   定时清理下线的borker

                   定时向所有broker发送心跳信息,(包括订阅关系)

                   定时持久化Consumer消费进度(广播存储到本地,集群存储到Broker)

                   统计信息打点

                   动态调整消费线程池

         启动拉消息服务PullMessageService

         启动消费端负载均衡服务RebalanceService

         从namesrv更新topic路由信息

         向所有broker发送心跳信息,(包括订阅关系)

         唤醒Rebalance服务线程

RocketMQ原理解析-Consumer

 

consumer 2.消费端负载均衡

消费端负载均衡

RocketMQ原理解析-Consumer

RocketMQ原理解析-Consumer

消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载

消费端遍历自己的所有topic,依次调rebalanceByTopic

 根据topic获取此topic下的所有queue

 选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息)

 选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法,获取队列集合Set<MessageQueue>mqSet
1)  平均分配算法,其实是类似于分页的算法

将所有queue排好序类似于记录

将所有消费端consumer排好序,相当于页数

然后获取当前consumer所在页面应该分配到的queue

2)  按照配置来分配队列, 也就是说在consumer启动的时候指定了queue

3)  按照机房来配置队列

Consumer启动的时候会指定在哪些机房的消息

获取指定机房的queue

然后在执行如1)平均算法

 根据分配队列的结果更新ProccessQueueTable<MessageQueue,ProcessQueue>

1)      比对mqSet 将多余的队列删除,当broker当机或者添加,会导致分配到mqSet变化,

a)        将不在被本consumer消费的messagequeue的ProcessQueue删除,其实是设置ProcessQueue的droped属性为true

b)        将超过两份中没有拉取动作ProcessQueue删除

//TODO 为什么要删除掉,两分钟后来了消息怎么办?

//

2)      添加新增队列,比对mqSet,给新增的messagequeue

构建长轮询对象PullRequest对象,会从broker获取消费的进度

构建这个队列的ProcessQueue

将PullRequest对象派发到长轮询拉消息服务(单线程异步拉取)

注:ProcessQueue正在被消费的队列,

(1)    长轮询拉取到消息都会先存储到ProcessQueue的TreeMap<Long, MessageExt>集合中,消费调后会删除掉,用来控制consumer消息堆积,

TreeMap<Long, MessageExt> key是消息在此ConsumeQueue队列中索引

(2)    对于顺序消息消费处理

locked属性:当consumer端向broker申请锁队列成功后设置true,只有被锁定的processqueue才能被执行消费

rollback: 将消费在msgTreeMapTemp中的消息,放回msgTreeMap重新消费

commit: 将临时表msgTreeMapTemp数据清空,代表消费完成,放回最大偏移值

(3)    这里是个TreeMap,对key即消息的offset进行排序,这个样可以使得消息进行顺序消费

consumer 3.长轮询

Rocketmq的消息是由consumer端主动到broker拉取的, consumer向broker发送拉消息请求, PullMessageService服务通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取消息

  DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法执行向broker拉消息动作

1.      获取ProcessQueue判读是否drop的, drop为true返回

2.      给ProcessQueue设置拉消息时间戳

3.      流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息

4.      流量控制,正在消费队列中消息的跨度超过阀值(默认2000),稍后在消费

5.      根据topic获取订阅关系

6.      构建拉消息回调对象PullBack, 从broker拉取消息(异步拉取)返回结果是回调

7.      从内存中获取commitOffsetValue  //TODO 这个值跟pullRequest.getNextOffset区别

8.      构建sysFlag   pull接口用到的flag

9.      调底层通信层向broker发送拉消息请求

如果master压力过大,会建议去slave拉取消息

如果是到broker拉取消息清楚实时提交标记位,因为slave不允许实时提交消费进度,可以定时提交 

//TODO 关于master拉消息实时提交指的是什么?

10.  拉到消息后回调PullCallback

处理broker返回结果pullResult

           更新从哪个broker(master 还是slave)拉取消息

           反序列化消息

           消息过滤

           消息中放入队列最大最小offset,方便应用来感知消息堆积度

将消息加入正在处理队列ProcessQueue

将消息提交到消费消息服务ConsumeMessageService

流控处理, 如果pullInterval参数大于0 (拉消息间隔,如果为了降低拉取速度,可以设置大于0的值),延迟再执行拉消息,  如果pullInterval为0立刻在执行拉消息动作

序列图

1.      向broker发送长轮询请求

RocketMQ原理解析-Consumer

2.   Broker接收长轮询请求

RocketMQ原理解析-Consumer

3.      Consumer接收broker响应

RocketMQ原理解析-Consumer

长轮询活动图:

RocketMQ原理解析-Consumer

一张图画不下,再来一张

RocketMQ原理解析-Consumer

consumer 4.长轮询push消息-并发消息

通过长轮询拉取到消息后会提交到消息服务ConsumeMessageConcurrentlyService,

ConsumeMessageConcurrentlyServic的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。

长轮询向broker拉取消息是批量拉取的, 默认设置批量的值为pullBatchSize= 32,可配置

消费端consumer构建一个消费消息任务ConsumeRequest消费一批消息的个数是可配置的consumeMessageBatchMaxSize = 1, 默认批量个数为一个

ConsumeRequest 任务run方法执行

 判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息

                   构建并行消费上下文

                   给消息设置消费失败时候的retrytopic,当消息发送失败的时候发送到topic为%RETRY%groupname的队列中
调MessageListenerConcurrently监听器的consumeMessage方法消费消息,返回消费结果 如果ProcessQueue的droped为true,不处理结果,不更新offset, 但其实这里消费端是消费了消息的,这种情况感觉有被重复消费的风险 处理消费结果 消费成功, 对于批次消费消息,返回消费成功并不代表所有消息都消费成功,但是消费消息的时候一旦遇到消费消息失败直接放回,根据ackIndex来标记成功消费到哪里了 消费失败, ackIndex设置为-1 广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集群性能会有较大影响,失败重试功能交由应用处理 集群模式, 将消费失败的消息一条条的发送到broker的重试队列中去,如果此时还有发送到重试队列发送失败的消息,那就在cosumer的本地线程定时5秒钟以后重试重新消费消息,在走一次上面的消费流程。 删除正在消费的队列processQueue中本次消费的消息,放回消费进度 更新消费进度,这里只是一个内存offsettable的更新,后面有定时任务更新到broker上去

RocketMQ原理解析-Consumer

consumer 5.长轮询push消息-顺序消费消息

顺序消费服务ConsumeMessageConcurrentlyService构建的时候

                   构建一个线程池来接收消费请求ConsumeRequest

                   构建一个单线程的本地线程,用来稍后定时重新消费ConsumeRequest, 用来执行定时周期性(一秒)钟锁队列任务

         周期性锁队列lockMQPeriodically

                   获取正在消费队列列表ProcessQueueTable所有MesssageQueue, 构建根据broker归类成MessageQueue集合Map<brokername,Set<MessageQueue>>

                   遍历Map<brokername,Set<MessageQueue>>的brokername, 获取broker的master机器地址,将brokerName的Set<MessageQueue>发送到broker请求锁定这些队列。
在broker端锁定队列,其实就是在broker的queue中标记一下消费端,表示这个queue被某个client锁定。 Broker会返回成功锁定队列的集合,
根据成功锁定的MessageQueue,设置对应的正在处理队列ProccessQueue的locked属性为true没有锁定设置为false 通过长轮询拉取到消息后会提交到消息服务ConsumeMessageOrderlyService, ConsumeMessageOrderlyService的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。 ConsumeRequest任务的run方法 判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息 每个messagequeue都会生成一个队列锁来保证在当前consumer内,同一个队列串行消费, 判断processQueue的lock属性是否为true,lock属性是否过期,如果为false或者过期,放到本地线程稍后锁定在消费。 如果lock为true且没有过期,开始消费消息 计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将processqueue, messagequeue重新构建ConsumeRequest加到线程池10ms后在消费,这样防止个别队列被饿死 获取客户端的消费批次个数,默认一批次为一条 从proccessqueue获取批次消息, processqueue.takeMessags(batchSize), 从msgTreeMap中移除消息放到临时map中msgTreeMapTemp,这个临时map用来回滚消息和commit消息来实现事物消费 调回调接口消费消息,返回状态对象ConsumeOrderlyStatus 根据消费状态,处理结果 1) 非事物方式,自动提交 消息消息状态为success:调用processQueue.commit方法 获取msgTreeMapTemp的最后一个key,表示提交的 offset 清空msgTreeMapTemp的消息,已经成功消费 2) 事物提交,由用户来控制提交回滚(精卫专用) 更新消费进度, 这里的更新只是一个内存offsetTable的更新,后面有定时任务定时更新到broker上去

RocketMQ原理解析-Consumer

consumer 6.消息消费

消费者主动拉取消息消费,客户端通过类DefaultMQPullConsumer

         客户端可以指定特定MessageQueue

         也可以通过DefaultMQPullConsumer.fetchMessageQueuesInBalance(topic) 获取消费的队列

         业务自己获取消费队列,自己到broker拉取消息,以及自己更新消费进度

因为内部实现跟push方式类似就不在啰嗦,用法也请求看示例代码去

consumer 7.shutdown

DefaultMQPushConsumerImpl  关闭消费端

         关闭消费线程

         将分配到的Set<MessageQueue>的消费进度保存到broker

利用DefaultMQPushConsumerImpl获取ProcessQueueTable<MessageQueue,ProcessQueue>的keyset的messagequeue去获取

RemoteBrokerOffsetStore.offsetTable<MessageQueue,AutomicLong>Map中的消费进度,

offsetTable中的messagequeue的值,在update的时候如果没有对应的Messagequeue会构建, 但是也会rebalance的时候将没有分配到的messagequeue删除

rebalance会将offsettable中没有分配到messagequeue删除, 但是在从offsettable删除之前会将offset保存到broker

         Unregiser客户端

         pullMessageService关闭

         scheduledExecutorService关闭,关闭一些客户端的起的定时任务

         mqClientApi关闭

         rebalanceService关闭