Spring整合Quartz定时任务并在集群、分布式系统中的应用

时间:2021-11-09 18:00:17

概述

虽然单个quartz实例能给予你很好的job调度能力,但它不能满足典型的企业需求,如可伸缩性、高可靠性满足。假如你需要故障转移的能力并能运行日益增多的 job,quartz集群势必成为你应用的一部分了。使用 quartz 的集群能力可以更好的支持你的业务需求,并且即使是其中一台机器在最糟的时间崩溃了也能确保所有的 job 得到执行。

quartz 中集群如何工作

一个 quartz 集群中的每个节点是一个独立的 quartz 应用,它又管理着其他的节点。意思是你必须对每个节点分别启动或停止。不像许多应用服务器的集群,独立的 quartz 节点并不与另一其的节点或是管理节点通信。quartz 应用是通过数据库表来感知到另一应用的。

图:表示了每个节点直接与数据库通信,若离开数据库将对其他节点一无所知

Spring整合Quartz定时任务并在集群、分布式系统中的应用

创建quartz数据库表

因为quartz 集群依赖于数据库,所以必须首先创建quartz数据库表。quartz 包括了所有被支持的数据库平台的 sql 脚本。在 <quartz_home>/docs/dbtables 目录下找到那些 sql 脚本,这里的 <quartz_home> 是解压 quartz 分发包后的目录。
这里采用的quartz 2.2.3版本,总共11张表,不同版本,表个数可能不同。数据库为mysql,用tables_mysql_innodb.sql创建数据库表。

Spring整合Quartz定时任务并在集群、分布式系统中的应用

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#
# in your quartz properties file, you'll need to set
# org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.stdjdbcdelegate
#
#
# by: ron cordell - roncordell
# i didn't see this anywhere, so i thought i'd post it here. this is the script from quartz to create the tables in a mysql database, modified to use innodb instead of myisam.
 
drop table if exists qrtz_fired_triggers;
drop table if exists qrtz_paused_trigger_grps;
drop table if exists qrtz_scheduler_state;
drop table if exists qrtz_locks;
drop table if exists qrtz_simple_triggers;
drop table if exists qrtz_simprop_triggers;
drop table if exists qrtz_cron_triggers;
drop table if exists qrtz_blob_triggers;
drop table if exists qrtz_triggers;
drop table if exists qrtz_job_details;
drop table if exists qrtz_calendars;
 
create table qrtz_job_details(
sched_name varchar(120) not null,
job_name varchar(200) not null,
job_group varchar(200) not null,
description varchar(250) null,
job_class_name varchar(250) not null,
is_durable varchar(1) not null,
is_nonconcurrent varchar(1) not null,
is_update_data varchar(1) not null,
requests_recovery varchar(1) not null,
job_data blob null,
primary key (sched_name,job_name,job_group))
engine=innodb;
 
create table qrtz_triggers (
sched_name varchar(120) not null,
trigger_name varchar(200) not null,
trigger_group varchar(200) not null,
job_name varchar(200) not null,
job_group varchar(200) not null,
description varchar(250) null,
next_fire_time bigint(13) null,
prev_fire_time bigint(13) null,
priority integer null,
trigger_state varchar(16) not null,
trigger_type varchar(8) not null,
start_time bigint(13) not null,
end_time bigint(13) null,
calendar_name varchar(200) null,
misfire_instr smallint(2) null,
job_data blob null,
primary key (sched_name,trigger_name,trigger_group),
foreign key (sched_name,job_name,job_group)
references qrtz_job_details(sched_name,job_name,job_group))
engine=innodb;
 
create table qrtz_simple_triggers (
sched_name varchar(120) not null,
trigger_name varchar(200) not null,
trigger_group varchar(200) not null,
repeat_count bigint(7) not null,
repeat_interval bigint(12) not null,
times_triggered bigint(10) not null,
primary key (sched_name,trigger_name,trigger_group),
foreign key (sched_name,trigger_name,trigger_group)
references qrtz_triggers(sched_name,trigger_name,trigger_group))
engine=innodb;
 
create table qrtz_cron_triggers (
sched_name varchar(120) not null,
trigger_name varchar(200) not null,
trigger_group varchar(200) not null,
cron_expression varchar(120) not null,
time_zone_id varchar(80),
primary key (sched_name,trigger_name,trigger_group),
foreign key (sched_name,trigger_name,trigger_group)
references qrtz_triggers(sched_name,trigger_name,trigger_group))
engine=innodb;
 
create table qrtz_simprop_triggers
 (    
  sched_name varchar(120) not null,
  trigger_name varchar(200) not null,
  trigger_group varchar(200) not null,
  str_prop_1 varchar(512) null,
  str_prop_2 varchar(512) null,
  str_prop_3 varchar(512) null,
  int_prop_1 int null,
  int_prop_2 int null,
  long_prop_1 bigint null,
  long_prop_2 bigint null,
  dec_prop_1 numeric(13,4) null,
  dec_prop_2 numeric(13,4) null,
  bool_prop_1 varchar(1) null,
  bool_prop_2 varchar(1) null,
  primary key (sched_name,trigger_name,trigger_group),
  foreign key (sched_name,trigger_name,trigger_group)
  references qrtz_triggers(sched_name,trigger_name,trigger_group))
engine=innodb;
 
create table qrtz_blob_triggers (
sched_name varchar(120) not null,
trigger_name varchar(200) not null,
trigger_group varchar(200) not null,
blob_data blob null,
primary key (sched_name,trigger_name,trigger_group),
index (sched_name,trigger_name, trigger_group),
foreign key (sched_name,trigger_name,trigger_group)
references qrtz_triggers(sched_name,trigger_name,trigger_group))
engine=innodb;
 
create table qrtz_calendars (
sched_name varchar(120) not null,
calendar_name varchar(200) not null,
calendar blob not null,
primary key (sched_name,calendar_name))
engine=innodb;
 
create table qrtz_paused_trigger_grps (
sched_name varchar(120) not null,
trigger_group varchar(200) not null,
primary key (sched_name,trigger_group))
engine=innodb;
 
create table qrtz_fired_triggers (
sched_name varchar(120) not null,
entry_id varchar(95) not null,
trigger_name varchar(200) not null,
trigger_group varchar(200) not null,
instance_name varchar(200) not null,
fired_time bigint(13) not null,
sched_time bigint(13) not null,
priority integer not null,
state varchar(16) not null,
job_name varchar(200) null,
job_group varchar(200) null,
is_nonconcurrent varchar(1) null,
requests_recovery varchar(1) null,
primary key (sched_name,entry_id))
engine=innodb;
 
create table qrtz_scheduler_state (
sched_name varchar(120) not null,
instance_name varchar(200) not null,
last_checkin_time bigint(13) not null,
checkin_interval bigint(13) not null,
primary key (sched_name,instance_name))
engine=innodb;
 
create table qrtz_locks (
sched_name varchar(120) not null,
lock_name varchar(40) not null,
primary key (sched_name,lock_name))
engine=innodb;
 
create index idx_qrtz_j_req_recovery on qrtz_job_details(sched_name,requests_recovery);
create index idx_qrtz_j_grp on qrtz_job_details(sched_name,job_group);
 
create index idx_qrtz_t_j on qrtz_triggers(sched_name,job_name,job_group);
create index idx_qrtz_t_jg on qrtz_triggers(sched_name,job_group);
create index idx_qrtz_t_c on qrtz_triggers(sched_name,calendar_name);
create index idx_qrtz_t_g on qrtz_triggers(sched_name,trigger_group);
create index idx_qrtz_t_state on qrtz_triggers(sched_name,trigger_state);
create index idx_qrtz_t_n_state on qrtz_triggers(sched_name,trigger_name,trigger_group,trigger_state);
create index idx_qrtz_t_n_g_state on qrtz_triggers(sched_name,trigger_group,trigger_state);
create index idx_qrtz_t_next_fire_time on qrtz_triggers(sched_name,next_fire_time);
create index idx_qrtz_t_nft_st on qrtz_triggers(sched_name,trigger_state,next_fire_time);
create index idx_qrtz_t_nft_misfire on qrtz_triggers(sched_name,misfire_instr,next_fire_time);
create index idx_qrtz_t_nft_st_misfire on qrtz_triggers(sched_name,misfire_instr,next_fire_time,trigger_state);
create index idx_qrtz_t_nft_st_misfire_grp on qrtz_triggers(sched_name,misfire_instr,next_fire_time,trigger_group,trigger_state);
 
create index idx_qrtz_ft_trig_inst_name on qrtz_fired_triggers(sched_name,instance_name);
create index idx_qrtz_ft_inst_job_req_rcvry on qrtz_fired_triggers(sched_name,instance_name,requests_recovery);
create index idx_qrtz_ft_j_g on qrtz_fired_triggers(sched_name,job_name,job_group);
create index idx_qrtz_ft_jg on qrtz_fired_triggers(sched_name,job_group);
create index idx_qrtz_ft_t_g on qrtz_fired_triggers(sched_name,trigger_name,trigger_group);
create index idx_qrtz_ft_tg on qrtz_fired_triggers(sched_name,trigger_group);
 
commit;

配置数据库连接池

1.配置jdbc.properties文件

?
1
2
3
4
jdbc.url=jdbc\:mysql\://192.168.1.132\:3306/jiafuwei?useunicode\=true&characterencoding\=utf8&autoreconnect\=true
jdbc.username=root
jdbc.password=123456
jdbc.driverclassname=com.mysql.jdbc.driver

2.配置applicationcontext.xml文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
  xmlns:context="http://www.springframework.org/schema/context"
  xsi:schemalocation=" 
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
  ">
 
  <context:component-scan base-package="com.sundoctor" />
 
  <!-- 属性文件读入 -->
  <bean id="propertyconfigurer"
    class="org.springframework.beans.factory.config.propertyplaceholderconfigurer">
    <property name="locations">
      <list>
        <value>classpath:jdbc.properties</value>
      </list>
    </property>
  </bean>
 
 
  <!-- 数据源定义,使用c3p0 连接池 -->
  <bean id="datasource" class="com.mchange.v2.c3p0.combopooleddatasource"
    destroy-method="close">
    <property name="driverclass" value="${jdbc.driverclassname}" />
    <property name="jdbcurl" value="${jdbc.url}" />
    <property name="user" value="${jdbc.username}" />
    <property name="password" value="${jdbc.password}" />
    <property name="initialpoolsize" value="2" />
    <property name="minpoolsize" value="10" />
    <property name="maxpoolsize" value="20" />
    <property name="acquireincrement" value="2" />
    <property name="maxidletime" value="1800" />
  </bean>
</beans>

创建job测试服务类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.sundoctor.quartz.cluster.example;
 
import java.io.serializable;
 
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.stereotype.service;
 
 
@service("simpleservice")
public class simpleservice {
  
  private static final long serialversionuid = 122323233244334343l;
  private static final logger logger = loggerfactory.getlogger(simpleservice.class);
  
  public void testmethod1(){
    //这里执行定时调度业务
    logger.info("testmethod1.......1");
    system.out.println("2--testmethod1......."+system.currenttimemillis()/1000);
  }
  
  public void testmethod2(){
    logger.info("testmethod2.......2"); 
  }
}

创建一个job类myquartzjobbean1

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.sundoctor.quartz.cluster.example;
 
import org.quartz.disallowconcurrentexecution;
import org.quartz.jobexecutioncontext;
import org.quartz.jobexecutionexception;
import org.quartz.persistjobdataafterexecution;
import org.quartz.schedulerexception;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.context.applicationcontext;
import org.springframework.scheduling.quartz.quartzjobbean;
 
@persistjobdataafterexecution
@disallowconcurrentexecution// 不允许并发执行
public class myquartzjobbean1 extends quartzjobbean {
 
  private static final logger logger = loggerfactory.getlogger(myquartzjobbean1.class);
 
  @override
  protected void executeinternal(jobexecutioncontext jobexecutioncontext) throws jobexecutionexception {
 
    simpleservice simpleservice = getapplicationcontext(jobexecutioncontext).getbean("simpleservice",
        simpleservice.class);
    simpleservice.testmethod1();
 
  }
 
  private applicationcontext getapplicationcontext(final jobexecutioncontext jobexecutioncontext) {
    try {
      return (applicationcontext) jobexecutioncontext.getscheduler().getcontext().get("applicationcontextkey");
    } catch (schedulerexception e) {
      logger.error("jobexecutioncontext.getscheduler().getcontext() error!", e);
      throw new runtimeexception(e);
    }
  }
 
}

配置 quartz 使用集群

1.配置节点的 quartz.properties 文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
org.quartz.scheduler.instancename = testscheduler1 
org.quartz.scheduler.instanceid = auto
 
org.quartz.threadpool.class = org.quartz.simpl.simplethreadpool
org.quartz.threadpool.threadcount = 10
org.quartz.threadpool.threadpriority = 5
org.quartz.threadpool.threadsinheritcontextclassloaderofinitializingthread = true
 
org.quartz.jobstore.misfirethreshold = 60000
org.quartz.jobstore.class = org.quartz.impl.jdbcjobstore.jobstoretx
org.quartz.jobstore.driverdelegateclass=org.quartz.impl.jdbcjobstore.stdjdbcdelegate
org.quartz.jobstore.tableprefix = qrtz_
org.quartz.jobstore.maxmisfirestohandleatatime=10
org.quartz.jobstore.isclustered = true
org.quartz.jobstore.clustercheckininterval = 20000

org.quartz.scheduler.instancename属性可为任何值,用在 jdbc jobstore 中来唯一标识实例,但是所有集群节点中必须相同。

org.quartz.scheduler.instanceid 属性为 auto即可,基于主机名和时间戳来产生实例 id。

org.quartz.jobstore.class属性为 jobstoretx,将任务持久化到数据中。因为集群中节点依赖于数据库来传播 scheduler 实例的状态,你只能在使用 jdbc jobstore 时应用 quartz 集群。这意味着你必须使用 jobstoretx 或是 jobstorecmt 作为 job 存储;你不能在集群中使用 ramjobstore。

org.quartz.jobstore.isclustered 属性为 true,你就告诉了 scheduler 实例要它参与到一个集群当中。这一属性会贯穿于调度框架的始终,用于修改集群环境中操作的默认行为。

org.quartz.jobstore.clustercheckininterval 属性定义了scheduler 实例检入到数据库中的频率(单位:毫秒)。scheduler 检查是否其他的实例到了它们应当检入的时候未检入;这能指出一个失败的 scheduler 实例,且当前 scheduler 会以此来接管任何执行失败并可恢复的 job。通过检入操作,scheduler 也会更新自身的状态记录。clusterchedkininterval 越小,scheduler 节点检查失败的 scheduler 实例就越频繁。默认值是 15000 (即15 秒)。

2.配置applicationcontext-quartz.xml文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
  xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 
  <bean name="quartzscheduler"
    class="org.springframework.scheduling.quartz.schedulerfactorybean">
    <property name="datasource">
      <ref bean="datasource" />
    </property>
    <property name="applicationcontextschedulercontextkey" value="applicationcontextkey" />
    <property name="configlocation" value="classpath:quartz.properties" />   
    <property name="triggers">
      <list>
        <ref bean="trigger1" />
      </list>
    </property>
  </bean>
 
  <bean id="jobdetail1" class="org.springframework.scheduling.quartz.jobdetailfactorybean">
    <property name="jobclass">
      <value>com.sundoctor.quartz.cluster.example.myquartzjobbean1</value>
    </property> 
    <property name="durability" value="true" /> 
    <property name="requestsrecovery" value="true" />   
  </bean>
  <bean id="trigger1" class="org.springframework.scheduling.quartz.crontriggerfactorybean">
    <property name="jobdetail" ref="jobdetail1" />
    <property name="cronexpression" value="0/10 * * * * ?" />
  </bean>
</beans>

datasource:项目中用到的数据源,里面包含了quartz用到的11张数据库表;

applicationcontextschedulercontextkey: 是org.springframework.scheduling.quartz.schedulerfactorybean这个类中把spring上下 文以key/value的方式存放在了schedulercontext中了,可以用applicationcontextschedulercontextkey所 定义的key得到对应spring 的applicationcontext;

configlocation:用于指明quartz的配置文件的位置

requestsrecovery

requestsrecovery属性必须设置为 true,当quartz服务被中止后,再次启动或集群中其他机器接手任务时会尝试恢复执行之前未完成的所有任务。

运行quartz集群

在相同或不同的机器上运行com.sundoctor.quartz.cluster.example.test.maintest进行测试,在本例中只是简单打印一下日志。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.sundoctor.quartz.cluster.example.test;
 
import org.springframework.context.applicationcontext;
import org.springframework.context.support.classpathxmlapplicationcontext;
 
public class maintest {
 
  /**
   * @param args
   */
  public static void main(string[] args) {
    applicationcontext springcontext = new classpathxmlapplicationcontext(new string[]{"classpath:applicationcontext.xml","classpath:applicationcontext-quartz.xml"});
  }
 
}

quartz 实际并不关心你是在相同的还是不同的机器上运行节点。当集群是放置在不同的机器上时,通常称之为水平集群。节点是跑在同一台机器是,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是个坏消息,因为一旦机器崩溃了,所有的节点也就被有效的终止了。

当你运行水平集群时,时钟应当要同步,以免出现离奇且不可预知的行为。假如时钟没能够同步,scheduler 实例将对其他节点的状态产生混乱。有几种简单的方法来保证时钟何持同步,而且也没有理由不这么做。最简单的同步计算机时钟的方式是使用某一个 internet 时间服务器(internet time server its)。

没什么会阻止你在相同环境中使用集群的和非集群的 quartz 应用。唯一要注意的是这两个环境不要混用在相同的数据库表。意思是非集群环境不要使用与集群应用相同的一套数据库表;否则将得到希奇古怪的结果,集群和非集群的 job 都会遇到问题。

假如你让一个非集群的 quartz 应用与集群节点并行着运行,设法使用 jobinitializationplugin和 ramjobstore。

项目下载地址,点击下载。里面包含所需的jar。

在eclipse的两个工作空间同时开启这两个项目,连接同一个mysql数据库,发现只有一个定时任务在运行,停掉其中一个项目,另外一个项目的定时任务开启了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://www.cnblogs.com/jiafuwei/p/6145280.html