Spring Boot集成quartz实现定时任务并支持切换任务数据源

时间:2022-12-30 08:12:34

org.quartz实现定时任务并自定义切换任务数据源

在工作中经常会需要使用到定时任务处理各种周期性的任务,org.quartz是处理此类定时任务的一个优秀框架。随着项目一点点推进,此时我们并不满足于任务仅仅是定时执行,我们还想要对任务进行更多的控制,随时能对任务进行人为干预,就需要对quartz有更深入的了解。而随着微服务的流行,项目中多数据源的情况也越来越常见,在定时任务中集成多数据源切换的功能也需要集成进来。

集成quartz实现定时任务

集成quartz实现定时任务

quartz中实现定时任务需要了解的基本概念

Job

通过实现Job类,在实现方法中写我们具体想要定时任务完成的工作,然后交给quartz管理。

JobDetail

Job只负责实现具体任务,所以还需要借助JobDetail来存储一些描述Job的基本信息。

Quartz JobBuilder

为构造JobDetail实体提供的builder-style API。你可以这样使用它来构建一个JobDetail

@Bean
public JobDetail jobDetail() {
return JobBuilder.newJob().ofType(SampleJob.class)
.storeDurably()
.withIdentity("Qrtz_Job_Detail")
.withDescription("Invoke Sample Job service...")
.build();
}

Spring JobDetailFactoryBean

Spring中配置JobDetail的方式:

@Bean
public JobDetailFactoryBean jobDetail() {
JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
jobDetailFactory.setJobClass(SampleJob.class);
jobDetailFactory.setDescription("Invoke Sample Job service...");
jobDetailFactory.setDurability(true);
return jobDetailFactory;
}

Trigger

触发器,代表一个调度参数的配置,什么时候去调度:

@Bean
public Trigger trigger(JobDetail job) {
return TriggerBuilder.newTrigger().forJob(job)
.withIdentity("Qrtz_Trigger")
.withDescription("Sample trigger")
.withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
.build();
}

Scheduler

调度器,通过JobTrigger来注册一个调度器:

@Bean
public Scheduler scheduler(Trigger trigger, JobDetail job) {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(new ClassPathResource("quartz.properties").getInputStream()); Scheduler scheduler = factory.getScheduler();
scheduler.setJobFactory(springBeanJobFactory());
scheduler.scheduleJob(job, trigger); scheduler.start();
return scheduler;
}

给系统添加一个Job

quartzJob就是我们需要去执行的任务,由Scheduler调度器负责调度任务们依靠制定好的Trigger来定时执行任务。

因此首先我们需要结合以上基础给系统添加一个Job。

addJob

    public void addJob(BaseJob job) throws SchedulerException {
/** 创建JobDetail实例,绑定Job实现类
* JobDetail 表示一个具体的可执行的调度程序,job是这个可执行调度程序所要执行的内容
* 另外JobDetail还包含了这个任务调度的方案和策略**/
// 指明job的名称,所在组的名称,以及绑定job类
JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass())
.withIdentity(job.getJobKey())
.withDescription(job.getDescription())
.usingJobData(job.getDataMap())
.build(); /**
* Trigger代表一个调度参数的配置,什么时候去调度
*/
//定义调度触发规则, 使用cronTrigger规则
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(job.getJobName(),job.getJobGroup())
.withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression()))
.startNow()
.build();
//将任务和触发器注册到任务调度中去
scheduler.scheduleJob(jobDetail,trigger);
//判断调度器是否启动
if(!scheduler.isStarted()){
scheduler.start();
}
log.info(String.format("定时任务:%s.%s-已添加到调度器!", job.getJobGroup(),job.getJobName()));
}

首先需要定义好我们的Job,之后通过Job初始化JobDetailTrigger,最后将JobDetailTrigger注册到调度器中。

BaseJob

Job的结构如下:

public abstract class BaseJob implements Job,Serializable {
private static final long serialVersionUID = 1L;
private static final String JOB_MAP_KEY = "self";
/**
* 任务名称
*/
private String jobName;
/**
* 任务分组
*/
private String jobGroup;
/**
* 任务状态 是否启动任务
*/
private String jobStatus;
/**
* cron表达式
*/
private String cronExpression;
/**
* 描述
*/
private String description;
/**
* 任务执行时调用哪个类的方法 包名+类名
*/
private Class beanClass = this.getClass();
/**
* 任务是否有状态
*/
private String isConcurrent; /**
* Spring bean
*/
private String springBean; /**
* 任务调用的方法名
*/
private String methodName; /**
* 该任务所使用的数据源
*/
private String dataSource = DataSourceEnum.DB1.getName(); /**
* 为了将执行后的任务持久化到数据库中
*/
@JsonIgnore
private JobDataMap dataMap = new JobDataMap(); public JobKey getJobKey(){
return JobKey.jobKey(jobName, jobGroup);// 任务名称和组构成任务key
}
...
}

可以看到Job中定义了任务的一些基本信息,重点关注其中的dataSourcedataMap属性。其中dataSource是任务所使用的数据源,并给了一个默认值;由于任务在添加后会持久化到数据库中,之后解析任务就会用到dataMap

SchedulerConfig

在添加Job的时候,JobDetailTrigger都是通过关键字new生成的,而调度器Scheduler则需要放在容器中维护。

@Configuration
@Order
public class SchedulerConfig {
@Autowired
private MyJobFactory myJobFactory; @Value("${spring.profiles.active}")
private String profile; /*
* 通过SchedulerFactoryBean获取Scheduler的实例
*/
@Bean(name = "scheduler")
public Scheduler scheduler() throws Exception {
return schedulerFactoryBean().getScheduler();
} @Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setOverwriteExistingJobs(true); // 延时启动
factory.setStartupDelay(20); // 加载quartz数据源配置
factory.setQuartzProperties(quartzProperties()); // 自定义Job Factory,用于Spring注入
factory.setJobFactory(myJobFactory);
/*********全局监听器配置************/
JobListener myJobListener = new SchedulerListener();
factory.setGlobalJobListeners(myJobListener);//直接添加为全局监听器
return factory;
} @Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
if (Util.PRODUCT.equals(profile)) {//正式环境
System.out.println("正式环境quartz配置");
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties"));
} else {
System.out.println("测试环境quartz配置");
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
}
//在quartz.properties中的属性被读取并注入后再初始化对象
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
} /*
* quartz初始化监听器
*/
@Bean
public QuartzInitializerListener executorListener() {
return new QuartzInitializerListener();
}
}

上述代码中,将scheduler加入到Spring容器中。scheduler是由SchedulerFactoryBean进行维护的,在SchedulerFactoryBean中对调度器工厂做了一些基本设置并从配置文件中加载了quartz数据源配置(配置文件的读取会根据运行环境profile来进行自动切换),配置了一个全局监听器用以监听任务的执行过程。

MyJobFactory

使用Spring提供的JobFactory

@Component
public class MyJobFactory extends AdaptableJobFactory { @Autowired
private AutowireCapableBeanFactory capableBeanFactory; @Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
// 调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
// 进行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}

quartz.properties

quartz.properties中是quartz连接数据库的一些配置信息。

# \u56FA\u5B9A\u524D\u7F00org.quartz
# \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206
#
#
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false # \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool # threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B
# \u5E76\u53D1\u4E2A\u6570
org.quartz.threadPool.threadCount = 5
# \u4F18\u5148\u7EA7
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true org.quartz.jobStore.misfireThreshold = 5000 # \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore #\u6301\u4E45\u5316
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #org.quartz.jobStore.useProperties=false org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.dataSource = qzDS org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=true
org.quartz.dataSource.qzDS.user=quartz
org.quartz.dataSource.qzDS.password=123456 org.quartz.dataSource.qzDS.maxConnections = 30 org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUAL org.quartz.dataSource.qzDS.validateOnCheckout = true
org.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40 #org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60

quartz会根据这个配置文件将Job持久化到数据库中,也因此quartz会需要初始化一些数据库表,表结构文件在文末。

SchedulerListener

调度器监听器用以监听任务的执行状态。

public class SchedulerListener implements JobListener {

    private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class);

    public static final String LISTENER_NAME = "QuartSchedulerListener";

    @Override
public String getName() {
return LISTENER_NAME; //must return a name
} //任务被调度前
@Override
public void jobToBeExecuted(JobExecutionContext context) {
String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource");
// 切换任务的数据源
DataSourceContextHolder.setDB(dataSource);
String jobName = context.getJobDetail().getKey().toString();
LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName());
} //任务调度被拒了
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().toString();
LOG.error("job {} is jobExecutionVetoed", jobName);
//可以做一些日志记录原因 } //任务被调度后
@Override
public void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException) {
// 清空存储的数据源
String jobName = context.getJobDetail().getKey().toString();
DataSourceContextHolder.clearDB();
LOG.info("Job : {} is finished", jobName);
if (jobException != null && !jobException.getMessage().equals("")) {
LOG.error("Exception thrown by: " + jobName
+ " Exception: " + jobException.getMessage());
} }
}

SchedulerListener监听任务被调度前、调度后和调度被拒绝时的状态,在任务被调度之前和之后对任务所使用的数据源进行了处理。如果项目中不需要数据源切换的话,这个监听器是不需要的,到此已经完成了quartz的集成。

多数据源切换

多数据源切换

通过自定义DynamicDataSource来覆盖Spring Boot中原有的数据源。

DataSourceConfig

通过读取配置文件中不同的数据源,初始化项目中可能用到的数据源用以切换。

/**
* 多数据源配置类
*/
@Configuration
public class DataSourceConfig {
//数据源1
@Bean(name = "datasource1")
@ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis中对应属性的前缀
public DataSource dataSource1() {
return DataSourceBuilder.create().build();
} //数据源2
@Bean(name = "datasource2")
@ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis中对应属性的前缀
public DataSource dataSource2() {
return DataSourceBuilder.create().build();
} /**
* 动态数据源: 通过AOP在不同数据源之间动态切换
*
* @return
*/
@Primary
@Bean(name = "dynamicDataSource")
public DataSource dynamicDataSource() {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
// 默认数据源
dynamicDataSource.setDefaultTargetDataSource(dataSource1());
// 配置多数据源
Map<Object, Object> dsMap = new HashMap();
dsMap.put(DataSourceEnum.DB1.getName(), dataSource1());
dsMap.put(DataSourceEnum.DB2.getName(), dataSource2()); dynamicDataSource.setTargetDataSources(dsMap);
return dynamicDataSource;
} @Bean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
//设置数据源
sqlSessionFactoryBean.setDataSource(dataSource);
return sqlSessionFactoryBean.getObject();
} /**
* 配置@Transactional注解事物
*
* @return
*/
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dynamicDataSource());
}
}

数据源配置

spring:
datasource:
db1:
driver-class-name: com.mysql.cj.jdbc.Driver
username: doctor
password: 123456
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true
db2:
driver-class-name: com.mysql.cj.jdbc.Driver
username: quartz
password: 123456
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true

DataSourceContextHolder

由于quartz在执行过程中是通过不同的线程来执行Job的,因此此处通过ThreadLocal来保存线程所使用的数据源情况。

/**
* 保存本地数据源
*/
public class DataSourceContextHolder {
private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class);
/**
* 默认数据源
*/
public static final String DEFAULT_DS = DataSourceEnum.DB1.getName();
/**
* ThreadLocal之后会进行讲解
*/
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>(); // 设置数据源名
public static void setDB(String dbType) {
LOG.info("切换到{}数据源", dbType);
contextHolder.set(dbType);
} // 获取数据源名
public static String getDB() {
return (contextHolder.get());
} // 清除数据源名
public static void clearDB() {
contextHolder.remove();
}
}

DynamicDataSource

获取执行中所使用的数据源。由于数据源被保存在了DataSourceContextHolder中的ThreadLocal中,所以直接获取就行了。

/**
* 获取本地数据源
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class); @Override
protected Object determineCurrentLookupKey() {
LOG.info("数据源为{}", DataSourceContextHolder.getDB());
return DataSourceContextHolder.getDB();
}
}

至此就完成了集成quartz及数据源切换的功能。然后就是具体的任务了。

执行任务

具体的任务需要继承BaseJob并在execute方法中重写具体需要执行的任务。

execute

@Slf4j
@Service
public class ReadNumJob extends BaseJob { @Autowired
private RedisService redisService; @Autowired
private JdbcTemplate jdbcTemplate; private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class); @Override
public void execute(JobExecutionContext context) {
doSomething();
}
}

指定数据源

然后在添加任务时指定任务所使用的数据源

ReadNumJob job = new ReadNumJob();
job.setJobName("test");
job.setJobGroup("hys");
job.setDescription("test");
// 指定数据源
job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName());
job.setCronExpression(
"0 */1 * * * ?"
);
try {
jobAndTriggerService.addJob(job);
} catch (SchedulerException e) {
e.printStackTrace();
}

源码

转评赞就是最大的鼓励

Spring Boot集成quartz实现定时任务并支持切换任务数据源的更多相关文章

  1. Spring Boot整合Quartz实现定时任务表配置

    最近有个小项目要做,spring mvc下的task设置一直不太灵活,因此在Spring Boot上想做到灵活的管理定时任务.需求就是,当项目启动的时候,如果有定时任务则加载进来,生成schedule ...

  2. spring boot 集成 quartz 定时任务

    spring boot: @EnableScheduling开启计划任务支持,@Scheduled计划任务声明 1.pom.xml 引入依赖 <dependency> <groupI ...

  3. ssh中使用spring的集成quartz 编写定时任务

    之前没有使用框架开发时对于开发定时任务都是 使用java的原声timer类,重写线程的run方法跑要执行的任务.刚刚换的新公司,项目使用ssh2,目前该项目中的定时任务的使用spirng集成的quar ...

  4. Spring Boot集成Quartz注入Spring管理的类

    摘要: 在Spring Boot中使用Quartz时,在JOB中一般需要引用Spring管理的Bean,通过定义Job Factory实现自动注入. Spring有自己的Schedule定时任务,在S ...

  5. Spring boot 集成三种定时任务方式

    三种定时任务方式分别为 org.springframework.scheduling.annotation.Scheduled java.util.concurrent.ScheduledExecut ...

  6. Quartz与Spring Boot集成使用

    上次自己搭建Quartz已经是几年前的事了,这次项目中需要定时任务,需要支持集群部署,想到比较轻量级的定时任务框架就是Quartz,于是来一波. 版本说明 通过搜索引擎很容易找到其官网,来到Docum ...

  7. Spring Boot集成持久化Quartz定时任务管理和界面展示

    本文是对之前的一篇文章Spring+SpringMVC+mybatis+Quartz整合代码部分做的一个修改和补充, 其中最大的变化就是后台框架变成了Spring Boot. 本工程所用到的技术或工具 ...

  8. spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,guava限流,定时任务案例&comma; 发邮件

    本文介绍spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例 集成swagger--对于做前后端分离的项目,后端只需要提供接口访问,swagger提供了接口 ...

  9. spring boot整合quartz实现多个定时任务

        版权声明:本文为博主原创文章,转载请注明出处. https://blog.csdn.net/liuchuanhong1/article/details/78543574 最近收到了很多封邮件, ...

随机推荐

  1. 第一个移动前端开源项目-dailog

    你还在为手机上没有忙碌光标而发愁吗?你还在抱怨弹出框组件要依赖zepto/jqery吗?你还在纠结是否要自己写一套还是去网上寻找成现成的UI组件吗?YouA为你轻松解决所有烦恼.YouA是我为移动前端 ...

  2. ASP&period;net的指令

    3.2 ASP.NET指令 在基于面向对象思想的.NET平台,可以称之为“万物皆对象”了.在这里,一个页面,一个用户控件,一个母版页等,全都是对象,全都有各自的属性. 在类文件里,我们表示类的属性可以 ...

  3. POJ 2315 最小费用最大流

    从1走到N然后从N走回来的最短路程是多少? 转换为费用流来建模. 1: /** 2: 因为e ==0 所以 pe[v] pe[v]^1 是两条相对应的边 3: E[pe[v]].c -= aug; E ...

  4. Ubuntu 14&period;04 设置静态IP

    使用Network Manager UI界面中指定 手动时,无法保存. 通过修改配置文件解决来此问题.记录以下. 如果输入过密码后,就会出现在这个目录下面, 以如下chinaNet为例 gaojing ...

  5. nginx报错 &lbrack;error&rsqb; open&lpar;&rpar; &OpenCurlyDoubleQuote;&sol;usr&sol;local&sol;var&sol;run&sol;openresty&period;pid” failed &lpar;2&colon; No such file or directory&rpar;

    解决: 服务没有启动 使用start启动服务,因为没有start而直接使用stop或者reload报错这个问题: 如果方法一没有解决,使用方法二:-C 指定配置文件nginx.conf或者weblua ...

  6. CF Round &num;510 &lpar;Div&period; 2&rpar;

    前言:没想到那么快就打了第二场,题目难度比CF Round #509 (Div. 2)这场要难些,不过我依旧菜,这场更是被\(D\)题卡了,最后\(C\)题都来不及敲了..最后才\(A\)了\(3\) ...

  7. Codeforces822 C&period; Hacker&comma; pack your bags&excl;

    C. Hacker, pack your bags! time limit per test 2 seconds memory limit per test 256 megabytes input s ...

  8. rpcserver不可用

    今天用打印机.电脑一直弹出rpcserver不可用.如图: 解决的方法:将例如以下服务启动就可以解决,如图:

  9. LOJ&num;2170&period; 「POI2011」木棍 Sticks

    题目链接 题意就是给你一堆线段,然后线段有长度和颜色,让你选三条组成一个三角形,这三条线段颜色不能一样 题解: 做法:贪心 首先按照长度给这些线段排序一遍 然后贪心的去选,对于已经选出来同种颜色的,就 ...

  10. js中定时器2

    <!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8" ...