RocketMQ源码解析之消息生产者(异步发送)

时间:2024-04-08 18:18:49

1.前言

我们在《RocketMQ源码解析之消息生产者(同步发送与单向发送)》一文中解析了RocketMQ生产者同步发送消息与单向发送消息,这个异步发送消息其实很多地方与同步发送一样,不过有一点是你在编程的时候需要提供SendCallback 对象,用来发送响应来的时候进行回调使用,我们知道同步发送是等待broker响应到来,然后将响应往上返回,这个异步调用就是响应来的时候,对你提供的回调对象进行调用,你这个回调对象可以写一些自己的逻辑等等。

2.源码解析

在源码解析之前我们要先看一下异步发送消息是怎样编程的
RocketMQ源码解析之消息生产者(异步发送)
这里需要我们提供一个SendCallback 对象用来响应来的时候回调,其中异常的时候会调用这里面的onException方法, 成功的时候调用onSuccess方法,执行相应的逻辑。
好了,现在开始源码解析:
RocketMQ源码解析之消息生产者(异步发送)
首先是调用了defaultMQProducerImpl 的send方法,并且将msg 与callback传进去
RocketMQ源码解析之消息生产者(异步发送)
这里又调用了下它的重载方法,然后将发送超时时间传进去,默认的一个发送超时时间是3s。
RocketMQ源码解析之消息生产者(异步发送)
这里需要注意的是,它将这个发送消息的任务交给了一个异步发送线程池,然后在任务中是调用了sendDefaultImpl 方法,然后通信方式是异步CommunicationMode.ASYNC,这里我们需要看下这个线程池的一些参数,因为关乎我们以后的调优
RocketMQ源码解析之消息生产者(异步发送)
这里先是判断asyncSenderExecutor 这个线程池是不是null,其实咱们这里它是null的(因为我们没有指定线程池,不过你编程的时候是可以指定的,使用setAsyncSenderExecutor()这个方法就可以设置了),就使用defaultAsyncSenderExecutor线程池,这个defaultAsyncSenderExecutor 线程池是在defaultMQProducerImpl 类构造方法创建的的,我们可以看下
RocketMQ源码解析之消息生产者(异步发送)
队列是5w大小,然后核心线程数 是cpu的核心数,maxThreads也是cpu核心数。
好了,我们继续往下看,这个sendDefaultImpl 方法其实就是选择MessageQueue然后重试那一套东西,不过,异步发送虽然走这个方法,但是它的失败重试不是这样子玩的,我们接着往下看接着又调用了sendKernelImpl 方法
RocketMQ源码解析之消息生产者(异步发送)
这里我们主要是看下异步处理这一块,因为我们在介绍同步发送与单向发送都有介绍过这个方法,其实这个方法就是封装请求头啥的,异步这一块我们可以看到,先是判断了一下超时没有,然后调用MQClientAPI的sendMessage方法,注意下它这个倒数第三个参数,这个参数是获取了一下默认的重试次数,默认是2。
RocketMQ源码解析之消息生产者(异步发送)
这个方法我们也是主要看下这个异步发送这块,首先是定义了一个times,这个times记录的发送次数,然后判断了是否超时,然后调用sendMessageAsync这个异步发送方法。这个方法又调用了RemotingClient的invokeAsync 方法,其中在sendMessageAsync 方法中创建了一个InvokeCallback 对象,我们先不管这个InvokeCallback ,后面再解释,先看下invokeAsync 方法:
RocketMQ源码解析之消息生产者(异步发送)
其实套路都一样,先是根据broker addr 获取对应的channel,然后判断一下channel状态,然后执行调用前的钩子,判断有没有超时,调用
invokeAsyncImpl方法进行发送。
RocketMQ源码解析之消息生产者(异步发送)
这个方法重要的操作就这几个,首先生成一个调用id ,也就是opaque , 接着获取信号量许可,这个信号量使用来限流的,默认是65535,获取之后判断一下有没有超时,然后封装ResponseFuture 对象,将ResponseFuture 对象缓存到response表中。
接着将消息写到channel中,注意有个listener 是在发送出去的时候执行,成功的话将ResponseFuture 对象设置发送成功,失败的走了requestFail(opaque)方法,失败我们先不看。这个时候就送成功了,等到收到broker响应的时候,NettyClientHandler 就能收到消息了
RocketMQ源码解析之消息生产者(异步发送)
这个是netty的知识点,是将NettyClientHandler 对象注册到netty的pipeline上面,在发送内容,接收内容,都会执行响应的实现方法。
RocketMQ源码解析之消息生产者(异步发送)
我们这里是收到的响应消息,然后调用processResponseCommand 处理
RocketMQ源码解析之消息生产者(异步发送)
这里就是根据opaque去responseTable这个缓存中找到对应的ResponseFuture 对象,然后设置响应内容,最最最重要的点就是看一下它的invokeCallBack有没有,我们发送消息的时候是有设置进去的。它会调用executeInvokeCallback 方法执行
RocketMQ源码解析之消息生产者(异步发送)
这里就是获取执行回调的线程池,如果线程池是null的话,就在当前线程执行。这个回调线程池参数我们也看下
RocketMQ源码解析之消息生产者(异步发送)
它是拿的这个线程是,默认核心线程数也是cpu核心数。
接着就是调用了ResponseFuture 的executeInvokeCallback 方法
RocketMQ源码解析之消息生产者(异步发送)
设置回调状态,然后调用invokeCallback的operationComplete 方法,现在我们再回到MQClientAPI的sendMessageAsync 方法中,因为当时是在这个方法中创建的这个 invokeCallback 对象
RocketMQ源码解析之消息生产者(异步发送)
这个方法分为2部分吧,一是有响应,然后没有sendCallback ,这个sendCallback 是你自己写的那个回调对象,这个时候没有的话说明你不准备回调了,然后解析了一下结果,执行了一下 调用后的钩子,这部分就算完事了,二是有响应,然后也是有这个回调对象sendCallback的,先是解析了下响应,然后执行了你写的那个sendCallback 对象,另外就是执行了updateFaultItem ,进行更新一个响应信息,如果你看我的《RocketMQ源码解析之消息生产者(容错)》这篇文章就能知道这个方法是干嘛的了。
如果异常的话,执行了一个onExceptionImpl 方法来处理,我们来看下这个方法的实现
RocketMQ源码解析之消息生产者(异步发送)
增加调用次数,然后判断是否需要重试&& 重试次数在范围内,然后就是重新选择一个MessageQueue,重新设置请求id,也就是opaque这个,最后就是调用 sendMessageAsync 进行发送了,这就是异步调用的一个重试逻辑,并没有使用for循环的形式。
好了,到这我们的异步发送解析就已经ok了
我们用一张图来总结下这个流程:
RocketMQ源码解析之消息生产者(异步发送)
(原图是我在process on 画的链接:链接