Quartz源码阅读

时间:2021-07-22 09:23:19

基于Quartz1.8.5的源码解读

首先看一个demo

//简单的任务管理类
//QuartzManager.java package quartzPackage; import java.text.ParseException;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory; /** *//**
* @Title:Quartz管理类
*
* @Description:
*
* @Copyright:
* @author zz 2008-10-8 14:19:01
* @version 1.00.000
*
*/
public class QuartzManager {
private static SchedulerFactory sf = new StdSchedulerFactory();
private static String JOB_GROUP_NAME = "group1";
private static String TRIGGER_GROUP_NAME = "trigger1"; /** *//**
* 添加一个定时任务,使用默认的任务组名,触发器名,触发器组名
* @param jobName 任务名
* @param job 任务
* @param time 时间设置,参考quartz说明文档
* @throws SchedulerException
* @throws ParseException
*/
public static void addJob(String jobName,Job job,String time)
throws SchedulerException, ParseException{
Scheduler sched = sf.getScheduler();
JobDetail jobDetail = new JobDetail(jobName, JOB_GROUP_NAME, job.getClass());//任务名,任务组,任务执行类
//触发器
CronTrigger trigger =
new CronTrigger(jobName, TRIGGER_GROUP_NAME);//触发器名,触发器组
trigger.setCronExpression(time);//触发器时间设定
sched.scheduleJob(jobDetail,trigger);
//启动
if(!sched.isShutdown())
sched.start();
} /** *//**
* 添加一个定时任务
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param job 任务
* @param time 时间设置,参考quartz说明文档
* @throws SchedulerException
* @throws ParseException
*/
public static void addJob(String jobName,String jobGroupName,
String triggerName,String triggerGroupName,
Job job,String time)
throws SchedulerException, ParseException{
Scheduler sched = sf.getScheduler();
JobDetail jobDetail = new JobDetail(jobName, jobGroupName, job.getClass());//任务名,任务组,任务执行类
//触发器
CronTrigger trigger =
new CronTrigger(triggerName, triggerGroupName);//触发器名,触发器组
trigger.setCronExpression(time);//触发器时间设定
sched.scheduleJob(jobDetail,trigger);
if(!sched.isShutdown())
sched.start();
} /** *//**
* 修改一个任务的触发时间(使用默认的任务组名,触发器名,触发器组名)
* @param jobName
* @param time
* @throws SchedulerException
* @throws ParseException
*/
public static void modifyJobTime(String jobName,String time)
throws SchedulerException, ParseException{
Scheduler sched = sf.getScheduler();
Trigger trigger = sched.getTrigger(jobName,TRIGGER_GROUP_NAME);
if(trigger != null){
CronTrigger ct = (CronTrigger)trigger;
ct.setCronExpression(time);
sched.resumeTrigger(jobName,TRIGGER_GROUP_NAME);
}
} /** *//**
* 修改一个任务的触发时间
* @param triggerName
* @param triggerGroupName
* @param time
* @throws SchedulerException
* @throws ParseException
*/
public static void modifyJobTime(String triggerName,String triggerGroupName,
String time)
throws SchedulerException, ParseException{
Scheduler sched = sf.getScheduler();
Trigger trigger = sched.getTrigger(triggerName,triggerGroupName);
if(trigger != null){
CronTrigger ct = (CronTrigger)trigger;
//修改时间
ct.setCronExpression(time);
//重启触发器
sched.resumeTrigger(triggerName,triggerGroupName);
}
} /** *//**
* 移除一个任务(使用默认的任务组名,触发器名,触发器组名)
* @param jobName
* @throws SchedulerException
*/
public static void removeJob(String jobName)
throws SchedulerException{
Scheduler sched = sf.getScheduler();
sched.pauseTrigger(jobName,TRIGGER_GROUP_NAME);//停止触发器
sched.unscheduleJob(jobName,TRIGGER_GROUP_NAME);//移除触发器
sched.deleteJob(jobName,JOB_GROUP_NAME);//删除任务
} /** *//**
* 移除一个任务
* @param jobName
* @param jobGroupName
* @param triggerName
* @param triggerGroupName
* @throws SchedulerException
*/
public static void removeJob(String jobName,String jobGroupName,
String triggerName,String triggerGroupName)
throws SchedulerException{
Scheduler sched = sf.getScheduler();
sched.pauseTrigger(triggerName,triggerGroupName);//停止触发器
sched.unscheduleJob(triggerName,triggerGroupName);//移除触发器
sched.deleteJob(jobName,jobGroupName);//删除任务
}
}
//测试main函数
//QuartzTest.java
package quartzPackage; import java.text.SimpleDateFormat;
import java.util.Date; public class QuartzTest { /** *//**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
SimpleDateFormat DateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
Date d = new Date();
String returnstr = DateFormat.format(d); TestJob job = new TestJob();
String job_name ="11";
try {
System.out.println(returnstr+ "【系统启动】");
QuartzManager.addJob(job_name,job,"0/2 * * * * ?"); //每2秒钟执行一次 // Thread.sleep(10000);
// System.out.println("【修改时间】");
// QuartzManager.modifyJobTime(job_name,"0/10 * * * * ?");
// Thread.sleep(20000);
// System.out.println("【移除定时】");
// QuartzManager.removeJob(job_name);
// Thread.sleep(10000);
//
// System.out.println("/n【添加定时任务】");
// QuartzManager.addJob(job_name,job,"0/5 * * * * ?"); } catch (Exception e) {
e.printStackTrace();
}
}
}

 

//测试工作类
//TestJob.java package quartzPackage; import java.text.SimpleDateFormat;
import java.util.Date; import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; public class TestJob implements Job {
SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date d = new Date();
String returnstr = DateFormat.format(d); public void execute(JobExecutionContext arg0) throws JobExecutionException {
// TODO Auto-generated method stub
System.out.println(returnstr+"★★★★★★★★★★★");
} }

先说明一下几个重要的Quartz组件:

1.scheduler是一个计划调度器容器(总部),容器里面可以盛放众多的JobDetail和trigger,当容器启动后,里面的每个JobDetail都会根据trigger按部就班自动去执行。

①Scheduler对象的产生过程 Scheduler对象是通过SchedulerFactory对象的getScheduler方法产生的,org.quartz.SchedulerFactory工厂接口有两个具体实现,一个是org.quartz.impl.StdSchedulerFactory对象, 一个是org.quartz.impl.DirectSchedulerFactory对象,二者的区别是什么?暂时没有分析

private static SchedulerFactory sf = new StdSchedulerFactory();

我们栗子当中使用的是是org.quartz.impl.StdSchedulerFactory对象,我们来看一下StdSchedulerFactory对象是如何产生Scheduler 对象的。

/**
* <p>
* Returns a handle to the Scheduler produced by this factory.
* </p>
*
* <p>
* If one of the <code>initialize</code> methods has not be previously
* called, then the default (no-arg) <code>initialize()</code> method
* will be called by this method.
* </p>
*/
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
} SchedulerRepository schedRep = SchedulerRepository.getInstance(); Scheduler sched = schedRep.lookup(getSchedulerName()); if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
} sched = instantiate(); return sched;
}

  

跟进初始化调度器方法sched = instantiate();发现是一个700多行的初始化方法,涉及到

读取配置资源,
生成QuartzScheduler对象,
创建该对象的运行线程,并启动线程;
初始化JobStore,QuartzScheduler,DBConnectionManager等重要组件,
至此,调度器的初始化工作已完成,初始化工作中quratz读取了数据库中存放的对应当前调度器的锁信息,对应CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS两个LOCK_NAME.
public void initialize() throws SchedulerException {
// short-circuit if already initialized
if (cfg != null) {
return;
}
if (initException != null) {
throw initException;
} String requestedFile = System.getProperty(PROPERTIES_FILE);
String propFileName = requestedFile != null ? requestedFile
: "quartz.properties";
File propFile = new File(propFileName); Properties props = new Properties(); InputStream in = null; try {
if (propFile.exists()) {
try {
if (requestedFile != null) {
propSrc = "specified file: '" + requestedFile + "'";
} else {
propSrc = "default file in current working dir: 'quartz.properties'";
} in = new BufferedInputStream(new FileInputStream(propFileName));
props.load(in); } catch (IOException ioe) {
initException = new SchedulerException("Properties file: '"
+ propFileName + "' could not be read.", ioe);
throw initException;
}
} else if (requestedFile != null) {
in =
Thread.currentThread().getContextClassLoader().getResourceAsStream(requestedFile); if(in == null) {
initException = new SchedulerException("Properties file: '"
+ requestedFile + "' could not be found.");
throw initException;
} propSrc = "specified file: '" + requestedFile + "' in the class resource path."; in = new BufferedInputStream(in);
try {
props.load(in);
} catch (IOException ioe) {
initException = new SchedulerException("Properties file: '"
+ requestedFile + "' could not be read.", ioe);
throw initException;
} } else {
propSrc = "default resource file in Quartz package: 'quartz.properties'"; ClassLoader cl = getClass().getClassLoader();
if(cl == null)
cl = findClassloader();
if(cl == null)
throw new SchedulerConfigException("Unable to find a class loader on the current thread or class."); in = cl.getResourceAsStream(
"quartz.properties"); if (in == null) {
in = cl.getResourceAsStream(
"/quartz.properties");
}
if (in == null) {
in = cl.getResourceAsStream(
"org/quartz/quartz.properties");
}
if (in == null) {
initException = new SchedulerException(
"Default quartz.properties not found in class path");
throw initException;
}
try {
props.load(in);
} catch (IOException ioe) {
initException = new SchedulerException(
"Resource properties file: 'org/quartz/quartz.properties' "
+ "could not be read from the classpath.", ioe);
throw initException;
}
}
} finally {
if(in != null) {
try { in.close(); } catch(IOException ignore) { /* ignore */ }
}
} initialize(overrideWithSysProps(props));
}

这里牵涉到读取配置文件,由于程序没有配置文件,将读取quartz内置的默认文件来配置

Quartz源码阅读

这个默认配置文件内容如下

# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
# org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true org.quartz.jobStore.misfireThreshold = 60000 org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore

2.Trigger对象

Trigger代表一个调度参数的配置,什么时候去调,Job的Name 和group将唯一标识一个Trigger,该Trigger对象将会和JobDetail对象一起保存到相应的执行任务的缓存或者数据库中,显然我们的程序没有使用到数据库配置,我们的所有执行任务都被缓存到

org.quartz.simpl.RAMJobStore 这个类当中 ,该类实现接口org.quartz.spi.JobStore类,

事实上Trigger对象和Job对象一一对象,Job对象被封装到JobDetail对象当中,全部被保存到RAMJobStore对象当中,看到其中有这个缓存对象

 protected TreeSet timeTriggers = new TreeSet(new TriggerComparator());

  

class TriggerComparator implements Comparator {

    public int compare(Object obj1, Object obj2) {
TriggerWrapper trig1 = (TriggerWrapper) obj1;
TriggerWrapper trig2 = (TriggerWrapper) obj2; int comp = trig1.trigger.compareTo(trig2.trigger);
if (comp != 0) {
return comp;
} comp = trig2.trigger.getPriority() - trig1.trigger.getPriority();
if (comp != 0) {
return comp;
} return trig1.trigger.getFullName().compareTo(trig2.trigger.getFullName());
} public boolean equals(Object obj) {
return (obj instanceof TriggerComparator);
}
}

该比较顺序的方法将Trigger对象的执行时间->优先级信息->名字信息比较排序,每次要获取要被执行的Trigger对象的时候都拿到TreeMap对象的头部对象peek对象。

我们看下Job对象和Trigger对象是如何加入到RAMJobStore对象当中去的

Scheduler sched = sf.getScheduler();

sched.scheduleJob(jobDetail,trigger);

 /**
* <p>
* Calls the equivalent method on the 'proxied' <code>QuartzScheduler</code>,
* passing the <code>SchedulingContext</code> associated with this
* instance.
* </p>
*/
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
throws SchedulerException {
return sched.scheduleJob(schedCtxt, jobDetail, trigger);
}

在org.quartz.core.QuartzScheduler对象当中的方法scheduleJob方法

 public Date scheduleJob(SchedulingContext ctxt, JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
validateState(); if (jobDetail == null) {
throw new SchedulerException("JobDetail cannot be null",
SchedulerException.ERR_CLIENT_ERROR);
} if (trigger == null) {
throw new SchedulerException("Trigger cannot be null",
SchedulerException.ERR_CLIENT_ERROR);
} jobDetail.validate(); if (trigger.getJobName() == null) {
trigger.setJobName(jobDetail.getName());
trigger.setJobGroup(jobDetail.getGroup());
} else if (trigger.getJobName() != null
&& !trigger.getJobName().equals(jobDetail.getName())) {
throw new SchedulerException(
"Trigger does not reference given job!",
SchedulerException.ERR_CLIENT_ERROR);
} else if (trigger.getJobGroup() != null
&& !trigger.getJobGroup().equals(jobDetail.getGroup())) {
throw new SchedulerException(
"Trigger does not reference given job!",
SchedulerException.ERR_CLIENT_ERROR);
} trigger.validate(); Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = resources.getJobStore().retrieveCalendar(ctxt,
trigger.getCalendarName());
}
Date ft = trigger.computeFirstFireTime(cal); if (ft == null) {
throw new SchedulerException(
"Based on configured schedule, the given trigger will never fire.",
SchedulerException.ERR_CLIENT_ERROR);
} resources.getJobStore().storeJobAndTrigger(ctxt, jobDetail, trigger);
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger); return ft;
}

处理jobDetail对象和trigger对象的方法 resources.getJobStore().storeJobAndTrigger(ctxt, jobDetail, trigger);

可以看到将会调用RAMJobStore对象的方法

/**
* <p>
* Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
* </p>
*
* @param newJob
* The <code>JobDetail</code> to be stored.
* @param newTrigger
* The <code>Trigger</code> to be stored.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* exists.
*/
public void storeJobAndTrigger(SchedulingContext ctxt, JobDetail newJob,
Trigger newTrigger) throws JobPersistenceException {
storeJob(ctxt, newJob, false);
storeTrigger(ctxt, newTrigger, false);
}

  

看到storeJob方法

 public void storeJob(SchedulingContext ctxt, JobDetail newJob,
boolean replaceExisting) throws ObjectAlreadyExistsException {
JobWrapper jw = new JobWrapper((JobDetail)newJob.clone()); boolean repl = false; synchronized (lock) {
if (jobsByFQN.get(jw.key) != null) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
repl = true;
} if (!repl) {
// get job group
HashMap grpMap = (HashMap) jobsByGroup.get(newJob.getGroup());
if (grpMap == null) {
grpMap = new HashMap(100);
jobsByGroup.put(newJob.getGroup(), grpMap);
}
// add to jobs by group
grpMap.put(newJob.getName(), jw);
// add to jobs by FQN map
jobsByFQN.put(jw.key, jw);
} else {
// update job detail
JobWrapper orig = (JobWrapper) jobsByFQN.get(jw.key);
orig.jobDetail = jw.jobDetail; // already cloned
}
}
}

我们看到JobWrapper对象,将JobDetail对象包装起来

class JobWrapper {

    public String key;

    public JobDetail jobDetail;

    JobWrapper(JobDetail jobDetail) {
this.jobDetail = jobDetail;
key = getJobNameKey(jobDetail);
} JobWrapper(JobDetail jobDetail, String key) {
this.jobDetail = jobDetail;
this.key = key;
} static String getJobNameKey(JobDetail jobDetail) {
return jobDetail.getGroup() + "_$x$x$_" + jobDetail.getName();
} static String getJobNameKey(String jobName, String groupName) {
return groupName + "_$x$x$_" + jobName;
} public boolean equals(Object obj) {
if (obj instanceof JobWrapper) {
JobWrapper jw = (JobWrapper) obj;
if (jw.key.equals(this.key)) {
return true;
}
} return false;
} public int hashCode() {
return key.hashCode();
} }

key是有jobDetail的组名和jobDetail的name组合在一起的

QuartzManager.addJob("11",job,"0/2 * * * * ?"); //每2秒钟执行一次

  private static String JOB_GROUP_NAME = "group1";
private static String TRIGGER_GROUP_NAME = "trigger1"; /** *//**
* 添加一个定时任务,使用默认的任务组名,触发器名,触发器组名
* @param jobName 任务名
* @param job 任务
* @param time 时间设置,参考quartz说明文档
* @throws SchedulerException
* @throws ParseException
*/
public static void addJob(String jobName,Job job,String time)
throws SchedulerException, ParseException{
Scheduler sched = sf.getScheduler();
JobDetail jobDetail = new JobDetail(jobName, JOB_GROUP_NAME, job.getClass());//任务名,任务组,任务执行类
//触发器
CronTrigger trigger =
new CronTrigger(jobName, TRIGGER_GROUP_NAME);//触发器名,触发器组
trigger.setCronExpression(time);//触发器时间设定
sched.scheduleJob(jobDetail,trigger);
//启动
if(!sched.isShutdown())
sched.start();
}

JobDetail的名字和组名都是我们确定的,Trigger的名字和组名也是我们确定的。

storeTrigger方法

public void storeTrigger(SchedulingContext ctxt, Trigger newTrigger,
boolean replaceExisting) throws JobPersistenceException {
TriggerWrapper tw = new TriggerWrapper((Trigger)newTrigger.clone()); synchronized (lock) {
if (triggersByFQN.get(tw.key) != null) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newTrigger);
} removeTrigger(ctxt, newTrigger.getName(), newTrigger.getGroup(), false);
} if (retrieveJob(ctxt, newTrigger.getJobName(), newTrigger.getJobGroup()) == null) {
throw new JobPersistenceException("The job ("
+ newTrigger.getFullJobName()
+ ") referenced by the trigger does not exist.");
} // add to triggers array
triggers.add(tw);
// add to triggers by group
HashMap grpMap = (HashMap) triggersByGroup.get(newTrigger
.getGroup());
if (grpMap == null) {
grpMap = new HashMap(100);
triggersByGroup.put(newTrigger.getGroup(), grpMap);
}
grpMap.put(newTrigger.getName(), tw);
// add to triggers by FQN map
triggersByFQN.put(tw.key, tw); if (pausedTriggerGroups.contains(newTrigger.getGroup())
|| pausedJobGroups.contains(newTrigger.getJobGroup())) {
tw.state = TriggerWrapper.STATE_PAUSED;
if (blockedJobs.contains(tw.jobKey)) {
tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
}
} else if (blockedJobs.contains(tw.jobKey)) {
tw.state = TriggerWrapper.STATE_BLOCKED;
} else {
timeTriggers.add(tw);
}
}
}

我们在配置job的时间的时候采用的是字符串形式

字段允许值允许的特殊字符
秒 0-59 , - * /
分 0-59 , - * /
小时 0-23 , - * /
日期 1-31 , - * ? / L W C
月份 1-12 或者 JAN-DEC , - * /
星期 1-7 或者 SUN-SAT , - * ? / L C #
年(可选)留空, 1970-2099 , - * /
表达式意义
"0 0 12 * * ?" 每天中午12点触发
"0 15 10 ? * *" 每天上午10:15触发
"0 15 10 * * ?" 每天上午10:15触发
"0 15 10 * * ? *" 每天上午10:15触发
"0 15 10 * * ? 2005" 2005年的每天上午10:15触发
"0 * 14 * * ?" 在每天下午2点到下午2:59期间的每1分钟触发
"0 0/5 14 * * ?" 在每天下午2点到下午2:55期间的每5分钟触发
"0 0/5 14,18 * * ?" 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
"0 0-5 14 * * ?" 在每天下午2点到下午2:05期间的每1分钟触发
"0 10,44 14 ? 3 WED" 每年三月的星期三的下午2:10和2:44触发
"0 15 10 ? * MON-FRI" 周一至周五的上午10:15触发
"0 15 10 15 * ?" 每月15日上午10:15触发
"0 15 10 L * ?" 每月最后一日的上午10:15触发
"0 15 10 ? * 6L" 每月的最后一个星期五上午10:15触发
"0 15 10 ? * 6L 2002-2005" 2002年至2005年的每月的最后一个星期五上午10:15触发
"0 15 10 ? * 6#3" 每月的第三个星期五上午10:15触发
每天早上6点
0 6 * * *
每两个小时
0 */2 * * *
晚上11点到早上7点之间每两个小时,早上八点
0 23-7/2,8 * * *
每个月的4号和每个礼拜的礼拜一到礼拜三的早上11点
0 11 4 * 1-3
1月1日早上4点
0 4 1 1 *

 

这个时间具体是怎么解析的呢?

//触发器
CronTrigger trigger =
new CronTrigger(jobName, TRIGGER_GROUP_NAME);//触发器名,触发器组
trigger.setCronExpression(time);//触发器时间设定

重点在trigger的setCronExpression(time)上,这里面对string的时间串进行了解析,将标准的下次执行时间进行了设置。

org.quartz.CronTrigger类当中

 public void setCronExpression(String cronExpression) throws ParseException {
TimeZone origTz = getTimeZone();
this.cronEx = new CronExpression(cronExpression);
this.cronEx.setTimeZone(origTz);
}

在org.quartz.CronExpression类当中

   public CronExpression(String cronExpression) throws ParseException {
if (cronExpression == null) {
throw new IllegalArgumentException("cronExpression cannot be null");
} this.cronExpression = cronExpression.toUpperCase(Locale.US); buildExpression(this.cronExpression);
}
protected void buildExpression(String expression) throws ParseException {
expressionParsed = true; try { if (seconds == null) {
seconds = new TreeSet();
}
if (minutes == null) {
minutes = new TreeSet();
}
if (hours == null) {
hours = new TreeSet();
}
if (daysOfMonth == null) {
daysOfMonth = new TreeSet();
}
if (months == null) {
months = new TreeSet();
}
if (daysOfWeek == null) {
daysOfWeek = new TreeSet();
}
if (years == null) {
years = new TreeSet();
} int exprOn = SECOND; StringTokenizer exprsTok = new StringTokenizer(expression, " \t",
false); while (exprsTok.hasMoreTokens() && exprOn <= YEAR) {
String expr = exprsTok.nextToken().trim(); // throw an exception if L is used with other days of the month
if(exprOn == DAY_OF_MONTH && expr.indexOf('L') != -1 && expr.length() > 1 && expr.indexOf(",") >= 0) {
throw new ParseException("Support for specifying 'L' and 'LW' with other days of the month is not implemented", -1);
}
// throw an exception if L is used with other days of the week
if(exprOn == DAY_OF_WEEK && expr.indexOf('L') != -1 && expr.length() > 1 && expr.indexOf(",") >= 0) {
throw new ParseException("Support for specifying 'L' with other days of the week is not implemented", -1);
} StringTokenizer vTok = new StringTokenizer(expr, ",");
while (vTok.hasMoreTokens()) {
String v = vTok.nextToken();
storeExpressionVals(0, v, exprOn);
} exprOn++;
} if (exprOn <= DAY_OF_WEEK) {
throw new ParseException("Unexpected end of expression.",
expression.length());
} if (exprOn <= YEAR) {
storeExpressionVals(0, "*", YEAR);
} TreeSet dow = getSet(DAY_OF_WEEK);
TreeSet dom = getSet(DAY_OF_MONTH); // Copying the logic from the UnsupportedOperationException below
boolean dayOfMSpec = !dom.contains(NO_SPEC);
boolean dayOfWSpec = !dow.contains(NO_SPEC); if (dayOfMSpec && !dayOfWSpec) {
// skip
} else if (dayOfWSpec && !dayOfMSpec) {
// skip
} else {
throw new ParseException(
"Support for specifying both a day-of-week AND a day-of-month parameter is not implemented.", 0);
}
} catch (ParseException pe) {
throw pe;
} catch (Exception e) {
throw new ParseException("Illegal cron expression format ("
+ e.toString() + ")", 0);
}
}

-------------------------------------------------------------------------------------------------------------------------------------

当我们把JobDetail和Trigger对象都保存到相应的位置之后,是谁来触发他们的呢?是通过的org.quartz.core.QuartzSchedulerThread这个处理主线程来实现的 ,可以这么认为,这个线程不断地额扫描RAMJobStore当中的内容,发现有需要执行的Job的时候,就把任务拿出来执行,并且是异步到线程池中去执行,大多数定时器架构都是需要一个不断刷新和派发任务的的线程,一个执行任务的线程池,一个保存定时任务的的对象或者数据库。

那这个线程是什么时候启动的呢?是在我们获取Scheduler对象的时候内部创建并启动的,现在我们看看这个过程。

看我们的

  public static void addJob(String jobName,Job job,String time)
throws SchedulerException, ParseException{
Scheduler sched = sf.getScheduler();
JobDetail jobDetail = new JobDetail(jobName, JOB_GROUP_NAME, job.getClass());//任务名,任务组,任务执行类
//触发器
CronTrigger trigger =
new CronTrigger(jobName, TRIGGER_GROUP_NAME);//触发器名,触发器组
trigger.setCronExpression(time);//触发器时间设定
sched.scheduleJob(jobDetail,trigger);
//启动
if(!sched.isShutdown())
sched.start();
}

跟进StdSchedulerFactory.getScheduler()方法内部

public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
} SchedulerRepository schedRep = SchedulerRepository.getInstance(); Scheduler sched = schedRep.lookup(getSchedulerName()); if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
} sched = instantiate(); return sched;
}

  Quartz源码阅读

在这个调用栈当中,我们看到通过org.quartz.impl.StdSchedulerFactory类当中的getScheduer()方法最终调用到了org.quartz.core.QuartzSchedulerThread对象的构造方法,而在构造方法当中,我们看到进行了自启动。

    QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs,
SchedulingContext ctxt, boolean setDaemon, int threadPrio) {
super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
this.qs = qs;
this.qsRsrcs = qsRsrcs;
this.ctxt = ctxt;
this.setDaemon(setDaemon);
if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
} this.setPriority(threadPrio); // start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = new AtomicBoolean(false);
this.start();
}

  初始化自身,并且this.start()启动线程。

对于线程的run方法即如何派发任务也是一个看点,该处理动态的采用Object.wait 和notify方法,避免了无效的CPU空转,尤其需要注意的是在wait期间,可以通过加入紧急任务来nitifyAll从而完成任务的调动。

Quartz源码阅读

看到QuartzScheduler的addJob方法加入了一个紧急的Job任务

public void run() {
boolean lastAcquireFailed = false; while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
} if (halted.get()) {
break;
}
} int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availTreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... Trigger trigger = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange();
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
} if (trigger != null) { now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized(sigLock) {
if(!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(trigger, triggerTime)) {
trigger = null;
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
if(trigger == null)
continue; // set trigger to 'executing'
TriggerFiredBundle bndle = null; boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while firing trigger '"
+ trigger.getFullName() + "'", se);
} catch (RuntimeException e) {
getLog().error(
"RuntimeException while firing trigger " +
trigger.getFullName(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
} // it's possible to get 'null' if the trigger was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'", se);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
continue;
} // TODO: improvements:
//
// 2- make sure we can get a job runshell before firing trigger, or
// don't let that throw an exception (right now it never does,
// but the signature says it can).
// 3- acquire more triggers at a time (based on num threads available?) JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
} if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
} continue;
}
} else { // if(availTreadCount > 0)
continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
} long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
sigLock.wait(timeUntilContinue);
} catch (InterruptedException ignore) {
}
} } catch(RuntimeException re) {
getLog().error("Runtime error occured in main trigger firing loop.", re);
}
} // loop... // drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}

我们下面来分析下run方法是如何找到要执行的任务,并且派发出去,如何进行wait和notify,如何进行循环任务的处理

下面是org.quartz.simpl.RAMJobStore当中的获取需要调用的Trigger的方法

 public Trigger acquireNextTrigger(SchedulingContext ctxt, long noLaterThan) {
TriggerWrapper tw = null; synchronized (lock) { while (tw == null) {
try {
tw = (TriggerWrapper) timeTriggers.first();
} catch (java.util.NoSuchElementException nsee) {
return null;
} if (tw == null) {
return null;
} if (tw.trigger.getNextFireTime() == null) {
timeTriggers.remove(tw);
tw = null;
continue;
} timeTriggers.remove(tw); if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
tw = null;
continue;
} if(tw.trigger.getNextFireTime().getTime() > noLaterThan) {
timeTriggers.add(tw);
return null;
} tw.state = TriggerWrapper.STATE_ACQUIRED; tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
Trigger trig = (Trigger) tw.trigger.clone();
return trig;
}
} return null;
}

  

Quartz源码阅读的更多相关文章

  1. 【原】SDWebImage源码阅读(四)

    [原]SDWebImage源码阅读(四) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 SDWebImage中主要实现了NSURLConnectionDataDelega ...

  2. &lpar;转&rpar; Spring源码阅读 之 Spring整体架构

    标签(空格分隔): Spring 声明:本文系转载,原地地址:spring framework 4 源码阅读 Spring骨架 Spring的骨架,也是Spring的核心包.主要包含三个内容 cont ...

  3. 【原】FMDB源码阅读(三)

    [原]FMDB源码阅读(三) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 FMDB比较优秀的地方就在于对多线程的处理.所以这一篇主要是研究FMDB的多线程处理的实现.而 ...

  4. 【原】FMDB源码阅读(二)

    [原]FMDB源码阅读(二) 本文转载请注明出处 -- polobymulberry-博客园 1. 前言 上一篇只是简单地过了一下FMDB一个简单例子的基本流程,并没有涉及到FMDB的所有方方面面,比 ...

  5. 【原】FMDB源码阅读(一)

    [原]FMDB源码阅读(一) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 说实话,之前的SDWebImage和AFNetworking这两个组件我还是使用过的,但是对于 ...

  6. 【原】AFNetworking源码阅读(六)

    [原]AFNetworking源码阅读(六) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 这一篇的想讲的,一个就是分析一下AFSecurityPolicy文件,看看AF ...

  7. 【原】AFNetworking源码阅读(五)

    [原]AFNetworking源码阅读(五) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 上一篇中提及到了Multipart Request的构建方法- [AFHTTP ...

  8. 【原】AFNetworking源码阅读(四)

    [原]AFNetworking源码阅读(四) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 上一篇还遗留了很多问题,包括AFURLSessionManagerTaskDe ...

  9. 【原】AFNetworking源码阅读(三)

    [原]AFNetworking源码阅读(三) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 上一篇的话,主要是讲了如何通过构建一个request来生成一个data tas ...

随机推荐

  1. angular2系列教程(八)In-memory web api、HTTP服务、依赖注入、Observable

    大家好,今天我们要讲是angular2的http功能模块,这个功能模块的代码不在angular2里面,需要我们另外引入: index.html <script src="lib/htt ...

  2. java 获取请求客户端的真实IP地址

    转载自:http://leiyongping88.iteye.com/blog/1545930 用request.getRemoteAddr();方法获取的IP地址是:127.0.0.1或192.16 ...

  3. pthread属性设置

    Posix线程中的线程属性pthread_attr_t主要包括scope属性.detach属性.堆栈地址.堆栈大小.优先级.在pthread_create中,把第二个参数设置为NULL的话,将采用默认 ...

  4. &lpar;转&rpar;19个必须知道的Visual Studio快捷键

    本文将为大家列出在 Visual Studio 中常用的快捷键,正确熟练地使用快捷键,将大大提高你的编程工作效率. 项目相关的快捷键 Ctrl + Shift + B = 生成项目 Ctrl + Al ...

  5. iPhone mobile safari fixed 元素滚动慢的问题处理

    最近做一个手机阅读应用,抓取网站数据,做格式化后,适合手机浏览器以及电脑上阅读,不显示任何其他内容无关元素. Site:http://cbread.duapp.com/ 固定左侧边栏时,使用的CSS如 ...

  6. JQuery 方法合集(懒人备记)

    原创文章,转载请私信.谢谢~ PS:请将jquery的引用文件放在head的标签内 语法:$(selector).action() $(document).ready(function(){ // 开 ...

  7. docker-solr 使用host模式修改默认的8983端口号

    1.使用root账号进入docker-solr docker exec -it 127627ab6247 -u root /bin/bash 2.安装vim apt-get update apt-ge ...

  8. git初学

    git在团队合作开发时是很有用的,SVN是集中式的代表,而git是分布式的代表,它分为代码区.暂存区.和本地库.在同一个团队中开发时,在代码存储中心(例如,码云.github)上创建一个库,用于储存和 ...

  9. Matches Game

    Matches Game http://poj.org/problem?id=2234 Time Limit: 1000MS   Memory Limit: 65536K Total Submissi ...

  10. 3D转弯保护区长啥样?

    3D转弯保护区长啥样? 2015-12-06 刘崇军 风螺旋线 在课本中.规范中看到的转弯保护区一直是平面化的样子.我们知道副区是由主区外扩而成,但具体怎样精确外扩无从知晓:我们知道主区边界至副区边界 ...