分布式任务调度框架XXL-JOB

时间:2023-02-09 16:59:21

1.XXL-JOB介绍

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。它的有两个核心模块,一个模块叫做调度中心,另外一个模块叫做执行器,它把任务调度和任务执行分成两个部分。这样调度模块只需要负责任务的调度属性,触发调度信号。执行模块只需要接收调度信号,去执行具体的业务逻辑,两者可以各自的进行扩容和缩容。

分布式任务调度框架XXL-JOB

2.管理端部署

2.1 下载源码

从github把源码工程clone到本地  ​https://github.com/xuxueli/xxl-job​

2.2 修改配置文件

/xxl-job/xxl-job-admin/src/main/resources/application.properties

### 数据库
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

### 报警邮箱
spring.mail.username=xxx@qq.com
spring.mail.from=xxx@qq.com
spring.mail.password=xxx

### xxl-job, access token
xxl.job.accessToken=default_token

### 国际化,默认为 “zh_CN”/中文简体
xxl.job.i18n=zh_CN

### xxl-job日志保存天数
xxl.job.logretentiondays=30

2.3 打包部署

将xxl-job-admin模块打包部署

管理端访问地址​​​​​​​ ​​​​​​​​​​​​​​​​​http://localhost:8080/xxl-job-admin​​,默认登录账号 “admin/123456”, 登录后运行界面如下图所示。

分布式任务调度框架XXL-JOB

3.普通任务集成

3.1 添加maven依赖

<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>

3.2 application.yml新增配置

xxl:
job:
admin:
addresses: https://127.0.0.1/xxl-job-admin
accessToken: default_token
executor:
appname: your-executor
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30

3.3 添加配置类

@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}
}

3.4 编写执行器

常规任务

@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");

for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}

分片任务

@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobHelper.log("第 {} 片, 忽略", i);
}
}

}

执行器启动后,就可以去调度中心新增任务,新增执行器了

3.5 管理端新增执行器,新增任务

分布式任务调度框架XXL-JOB

分布式任务调度框架XXL-JOB

4.动态增删改启停执行的任务

大部分任务都是执行相同操作,指定时间执行,比如每天凌晨5点把所有取消的订单归档。

还有一种情况,我希望随着业务数据的改变,任务也动态的修改执行时间,不同的业务数据动态生成不同的任务,并且随时可以启动,停止,执行任务

此功能基于3.1,3.2,3.3,3.4,3.5,不需要在管理端手动添加任务

4.1 把XxlJobService封装成Dubbo接口

1.新增xxl-job-api模块,把XxlJobService移动到该模块

分布式任务调度框架XXL-JOB

2.xxl-job-admin模块添加xxl-job-api,dubbo,zk的依赖

<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>chd-2.7.3-v1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
<!--zk-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

3.xxl-job-admin的application.properties添加相关配置

dubbo.application.id=xxl-job-service
dubbo.application.name=xxl-job-service
dubbo.registry.id=zk-registry
dubbo.registry.protocol=zookeeper
dubbo.registry.address=zookeeper://127.0.0.1:2181?client=curator
dubbo.registry.group=test
dubbo.registry.username=test
dubbo.registry.password=test
dubbo.scan.base-packages=com.xxl.job.admin.service
dubbo.protocol.id=dubbo
dubbo.protocol.name=dubbo
dubbo.protocol.port=36321
dubbo.provider.timeout=6000
dubbo.provider.status=threadpool
dubbo.consumer.check=false
dubbo.consumer.timeout=6000
dubbo.consumer.retries=0

4.xxl-job-admin启动类加上EnableDubbo注解

5.XxlJobServiceImpl类Service注解由spring切换成dubbo的

4.2 业务应用添加maven依赖

<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-api</artifactId>
<version>2.3.1</version>
</dependency>

4.3 构造XxlJobInfo参数

public XxlJobInfo buildXxlJobInfo() {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setJobGroup(JOB_GROUP);
xxlJobInfo.setScheduleType(SCHEDULE_TYPE);
xxlJobInfo.setGlueType(GLUE_TYPE);
xxlJobInfo.setExecutorRouteStrategy(EXECUTOR_ROUTE_STRATEGY);
xxlJobInfo.setMisfireStrategy(MISFIRE_STRATEGY);
xxlJobInfo.setExecutorBlockStrategy(EXECUTOR_BLOCK_STRATEGY);
xxlJobInfo.setExecutorTimeout(0);
xxlJobInfo.setExecutorFailRetryCount(0);
# 执行时间自行构造cron表达式
xxlJobInfo.setScheduleConf(scheduleConf);
return xxlJobInfo;
}

4.4 使用dubbo服务对任务动态操作

@Reference(version = VersionConst.SERVICE_DEFAULT_VERSION)
private XxlJobService xxlJobService;

@Override
public ResponseVo insert(UserDto userDto) {
boolean ok = transactionTemplate.execute((status) -> {
ReturnT<String> reslut = new ReturnT<>(200, "success");
try {
userMapper.insert(userDto);
XxlJobInfo xxlJobInfo = buildXxlJobInfo();
reslut = xxlJobService.add(xxlJobInfo);
if (reslut.getCode() != ReturnT.SUCCESS_CODE) {
status.setRollbackOnly();
log.error("xxlJobService.add.rpc execute error,result:{}", JSON.toJSON(reslut));
return false;
}
reslut = xxlJobService.start(Integer.valueOf(reslut.getContent()));
if (reslut.getCode() != ReturnT.SUCCESS_CODE) {
status.setRollbackOnly();
log.error("xxlJobService.add.rpc execute error,result:{}", JSON.toJSON(reslut));
return false;
}
} catch (Exception e) {
status.setRollbackOnly();
log.error("user.insert.transaction execute error,user:{}", JSON.toJSON(userDto), e);
return false;
}
return true;
});
if (log.isDebugEnabled()) {
log.debug("用户新增结束,id={},result={}", userDto.getId(), ok);
}
return ok ? ResponseVo.success(userDto.getId()) : ResponseVo.fail(500, "PERSISTENT_OR_RPC_ERROR");
}