分布式调度平台XXL-JOB源码分析-执行器端

时间:2022-06-20 09:19:11
上一篇文章已经说到调度中心端如何进行任务管理及调度,本文将分析执行器端是如何接收到任务调度请求,然后执行业务代码的。
XxlJobExecutorApplication为我们执行器的启动项,其中有个XxlJobConfig的配置项,发现其中有个属性为adminAddresses,这个就是我们调度中心的地址。

XxlJobSpringExecutor

分布式调度平台XXL-JOB源码分析-执行器端分布式调度平台XXL-JOB源码分析-执行器端
声明了init方法为start,点进来,
分布式调度平台XXL-JOB源码分析-执行器端分布式调度平台XXL-JOB源码分析-执行器端
它又实现了ApplicationContextAware接口,用来保存spring的上下文信息。
它还有个父类XxlJobExecutor,暂时未找到其他子类。
程序开始执行start方法,
分布式调度平台XXL-JOB源码分析-执行器端分布式调度平台XXL-JOB源码分析-执行器端
第一步,调用了本类的私有方法,这个方法就是把JobHandler的实现类取出来,再调用registJobHandler(name, handler)进行注册。
分布式调度平台XXL-JOB源码分析-执行器端分布式调度平台XXL-JOB源码分析-执行器端
registJobHandler的实现方法在父类中,很简单,维护了一个Map,估计后面进行任务的执行时会来这个map里面进行查询。
分布式调度平台XXL-JOB源码分析-执行器端分布式调度平台XXL-JOB源码分析-执行器端
第二步,估计是初始化了Glue的执行器。
第三步,调用父类的start方法,大部分的业务逻辑都在这里。
分布式调度平台XXL-JOB源码分析-执行器端分布式调度平台XXL-JOB源码分析-执行器端
1.日志处理器初始化
2.向adminBizList字段中放入XxlRpcReferenceBean返回的代理类,作用之后会单独开一篇注册心跳的文章说明。
3.任务日志清除
4.任务结果回调处理线程
5.启动另一个执行器的执行线程XxlRpcProviderFactory这个类是XXl其他的开源项目,自研RPC。

XxlRpcProviderFactory

看名字就知道这个类是可以返回Rpc调用服务提供端的工厂类,接上文,看他的initRpcProvider方法。

分布式调度平台XXL-JOB源码分析-执行器端分布式调度平台XXL-JOB源码分析-执行器端
分布式调度平台XXL-JOB源码分析-执行器端由上面的代码跟进来,发现这就是启动了一个以netty作为通讯模型、Hessian作为序列化方式的、ExecutorServiceRegistry作为注册逻辑实现类的服务提供端。
接着我们向其添加一个服务,名称为ExecutorBiz,版本为null,处理请求的实现类为ExecutorBizImpl,最后我们调用start方法完成执行器端的服务暴露。
分布式调度平台XXL-JOB源码分析-执行器端

此时的ServiceRegistry为ExecutorServiceRegistry,调用其start,以30秒的间隔和调度中心进行心跳通知,然后调用server的start方法,此时server为NettyHttpServer。

分布式调度平台XXL-JOB源码分析-执行器端

整个代码结构就是用netty启动了个服务,来看最后一个ChannelHandler,NettyHttpServerHandler。

分布式调度平台XXL-JOB源码分析-执行器端

调用私有方法process。

分布式调度平台XXL-JOB源码分析-执行器端

这里调用了xxlRpcProviderFactory的invokeService方法完成了服务实现的反射调用。

分布式调度平台XXL-JOB源码分析-执行器端

从serviceData中拿到我们之前调用addService方法添加的服务实现类,这里是ExecutorBizImpl,这里反射调用的方法是run。

ExecutorBizImpl

这个类实现了ExecutorBiz接口,看接口定义的方法,主要是作为执行器,提供给调度中心几个接口方法。

分布式调度平台XXL-JOB源码分析-执行器端

重点来看run方法。

分布式调度平台XXL-JOB源码分析-执行器端

首先XxlJobExecutor内部会有个以jobId为key,执行这个任务的线程为value的字段jobThreadRepository,我们首先去尝试的获取当前正在执行这个任务的线程,如果有,那就根据任务设置的运行模式进行处理,如下图。

分布式调度平台XXL-JOB源码分析-执行器端

如果没有正在执行此任务的线程,那就调用XxlJobExecutor.registJobThread()启动一个线程,最后将任务数据推送给这个可能是从jobThreadRepository获取到的也可能是新创建的线程,如下图。

分布式调度平台XXL-JOB源码分析-执行器端

分布式调度平台XXL-JOB源码分析-执行器端

分布式调度平台XXL-JOB源码分析-执行器端

JobThread的run方法会从triggerQueue里poll出任务,然后用之前设置的 handler进行execute的方法调用,并利用idleTimes字段进行线程无任务空转的次数控制,如下图。

分布式调度平台XXL-JOB源码分析-执行器端

至此,执行器完成了启动,暴露ExecutorBiz服务,接收任务调度数据TriggerParam,并在JobThread线程中完成任务配置的业务handler的执行。

时序图

分布式调度平台XXL-JOB源码分析-执行器端

分布式调度平台XXL-JOB源码分析-执行器端

分布式调度平台XXL-JOB源码分析-执行器端