Executor执行框架源代码分析(四)——ScheduledThreadPoolExecutor

时间:2022-04-03 15:46:38

    在前面一篇Executor执行框架源代码分析(三)中,已经详细介绍了ThreadPoolExecutor的实现方式。ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,其扩展了ThreadPoolExecutor的功能,扩展的功能主要是用来执行周期性任务的。ScheduledThreadPoolExecutor的任务分为三类:

       1、单一延时任务。设定一个时间,10s。任务将会在提交10秒钟后运行。

       2、固定周期任务。提交一个任务,设定好周期period时间,如,3秒。任务将会每3秒被调用一次,如果任务的执行时间超过了3秒,那么将会在任务执行完之后立即进行再次调用,不会进行并发执行。

       3、周期延时任务。提交一个任务,设置后固定延时时间,如,3秒。任务将会在第一次任务结束之后,延时3秒钟再次调用任务任务第二次。依次类推,第二次调用结束后,延时3秒钟,进行第三次调用。

先看一个demo,感受一下ScheduledThreadPoolExecutor的使用:

public class TestMain02 {

public static void main(String[] args) throws Exception{
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); //创建一个ScheduleThreadPool

//周期延时任务,以固定的延时运行任务。参数分别是:
//task对象,第一次调用的延时时间(2秒),后续任务调用的延时时间(3秒),延时时间的单位:秒
executor.scheduleWithFixedDelay(new Task(), 2,3, TimeUnit.SECONDS);
}
}

public class Task implements Runnable{
@Override
public void run() {
try {
System.out.println(this+" 被线程 "+Thread.currentThread().getName()+" 执行!!,开始时间:"+new Date());
Thread.sleep(4000);
System.out.println(this+" 被线程 "+Thread.currentThread().getName()+" 执行!!,结束时间: "+new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

//执行结果
//com.thread.blog.Task@534f5d93 被线程 pool-1-thread-1 执行!!,开始时间:Mon Mar 12 10:46:48 CST 2018
//com.thread.blog.Task@534f5d93 被线程 pool-1-thread-1 执行!!,结束时间: Mon Mar 12 10:46:52 CST 2018 (第一次调用结束)
//com.thread.blog.Task@534f5d93 被线程 pool-1-thread-1 执行!!,开始时间:Mon Mar 12 10:46:55 CST 2018 (3秒钟之后,第二次调用开始)
//com.thread.blog.Task@534f5d93 被线程 pool-1-thread-1 执行!!,结束时间: Mon Mar 12 10:46:59 CST 2018
//如果不关闭executor执行器,会一直按照上面的形式进行输出

上面调用的是scheduleWithFixedDelay方法,这个方法将会以固定的延时时间执行任务。类似的方法还有:

1、schedule(Runnable command,long delay, TimeUnit unit),提交后延迟执行,延迟时间:delay;

2、scheduleAtFixedRate(Runnable command, long initialDelay,long delay, TimeUnit unit), 提交后延迟执行,第一次延迟时间:initialDelay。后续任务将会以{initialDelay + delay},{initialDelay + delay*2}……进行执行。即上文所说的“固定周期任务”。


一、schedule方法的实现

schedule方法源代码如下:

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {	if (command == null || unit == null)		throw new NullPointerException();		//调用decorateTask方法包装command	RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));		//延迟执行command	delayedExecute(t);	return t;}

在上文中有提到,ScheduledThreadPoolExecutor会将提交的task包装成ScheduleFutureTask,再进行执行。schedule方法做的代码中的decorateTask方法的第二个参数就是一个ScheduleFutureTask对象。下文再详细讨论ScheduleFutureTask。

二、scheduleAtFixedRate方法发的实现

源代码如下:

 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {	if (command == null || unit == null)		throw new NullPointerException();	if (period <= 0)		throw new IllegalArgumentException();			//创建ScheduleFutureTask	ScheduledFutureTask<Void> sft = 		new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));		//装饰ScheduleFutureTask	RunnableScheduledFuture<Void> t = decorateTask(command, sft);	sft.outerTask = t;		//延时执行	delayedExecute(t);	return t;}

不做过多的解释,与schedule方法的实现是一样的。

三、scheduleWithFixedDelay方法的实现

源代码如下:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {	if (command == null || unit == null)		throw new NullPointerException();	if (delay <= 0)		throw new IllegalArgumentException();			//创建ScheduleFutureTask		ScheduledFutureTask<Void> sft =		new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));			//装饰ScheduleFutureTask		RunnableScheduledFuture<Void> t = decorateTask(command, sft);	sft.outerTask = t;		//延时执行	delayedExecute(t);	return t;}

不多做解释,又是一样的。

从上面三个方法的实现来看,三个方法的实现都是一样的。都是创建了ScheduleFutureTask,然后调用delayedExecutor方法进行执行。所以整个ScheduleThreadPoolExecutor的实现核心是ScheduleFutureTask 和 delayedExectuor。下文将详细分析ScheduleFutureTask  和  delayedExecutor。


四、delayedExecutor方法的实现

源代码如下:

private void delayedExecute(RunnableScheduledFuture<?> task) {	if (isShutdown()) //如果ScheduleThreadPoolExecutor已经关闭,则拒绝提交task		reject(task);	else {		super.getQueue().add(task); //将任务加入队列				//第二次判断,判断当前任务是否可以运行		//如果可以运行,则直接运行		//反之,则取消任务		if (isShutdown() &&			!canRunInCurrentRunState(task.isPeriodic()) &&			remove(task))			task.cancel(false); //取消任务		else			//启动线程,执行任务			//这个方法可以保证至少有一个线程在运行,是ThreadPoolExecutor中的方法			//这里就不多做介绍了			ensurePrestart(); 	}}

上面的源代码说明中,delayedExecute方法在执行任务之前,会先判断当前任务是否可以运行。整个判断过程如下:

1、执行器(ScheduleThreadPoolExecutor)是否已经关闭,因为有多线程修改,所以需要多次判断。

2、当task被放入等待队列的瞬间,再次检查执行器的运行状态,如果执行器已经shutdown,则执行canRunInCurrentRunState方法。canRunInCurrentRunState方法的运行状态如下:

boolean canRunInCurrentRunState(boolean periodic) {      //这里根据periodic的值,决定isRunningOrShutdown方法的参数      //continueExistingPeriodicTasksAfterShutdown是一个bollean值      //表示:执行器已经shutdown,是否应该执行前一刻刚刚加入队列的周期任务      //        //executeExistingDelayedTasksAfterShutdown是一个boolean值,      //表示:执行器已经shutdown,是否应该执行前一刻刚刚加入队列的延时任务                  //isRunningOrShutdown返回false,表示不再执行刚加入的task;反之,则执行      return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);  }  

五、ScheduleFutureTask 类的实现

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {        private final long sequenceNumber; //提交序列号,会影响getDelay的返回值。先提交的值小于后提交的值          private final long period; //如果为0,表示不是周期任务;如果不为0,则是周期任务,任务执行周期就是period的值        RunnableScheduledFuture<V> outerTask = this; //如果是周期性任务,在每次执行完之后,outerTask会被再次插入队列        int heapIndex; //等待队列中的索引值,用于在等待队列中快速定位当前对象        ScheduledFutureTask(Runnable r, V result, long ns) {          super(r, result);          this.time = ns;          this.period = 0; //非周期任务          //sequencer是维护在其外层类(ScheduledThreadPoolExecutor)中的一个属性          this.sequenceNumber = sequencer.getAndIncrement();       }        ScheduledFutureTask(Runnable r, V result, long ns, long period) {          super(r, result);          this.time = ns;          this.period = period;          this.sequenceNumber = sequencer.getAndIncrement();      }        ScheduledFutureTask(Callable<V> callable, long ns) {          super(callable);          this.time = ns;          this.period = 0;          this.sequenceNumber = sequencer.getAndIncrement();      }        public boolean isPeriodic() {          return period != 0;      }        private void setNextRunTime() {          long p = period;          if (p > 0)              time += p;          else              time = triggerTime(-p);      }        public void run() {          boolean periodic = isPeriodic();          if (!canRunInCurrentRunState(periodic)) //判断当前任务是否还可以执行              cancel(false);          else if (!periodic)//不是周期任务,直接调用run方法              ScheduledFutureTask.super.run();          else if (ScheduledFutureTask.super.runAndReset()) { //runAndSet方法会直接调用task的run方法,完成周期任务的执行              setNextRunTime(); //设置下一次执行的时间              reExecutePeriodic(outerTask);//这个方法会再次将当前对象放入等待队列,以备下次执行。          }      }  }  

上面的代码注释已经完整的说明了ScheduleFutureTask 的结构,这里就不赘述。


       这里有一个很关键的问题,周期任务是如何触发的,以及是如何保证周期任务按照设定的period执行的呢?

六、周期任务的触发

要解答这个问题,需要将关注点放在【delayedExecute】方法上,在delayedExecute的实现中有两个很重要的步骤:

1、将task加入等待队列;【代码:super.getQueue().add(task);】。

2、调用ensurePrestart方法,启动执行。

关于将task加入队列这个操作主要是让线程去队列中获取task,然后执行。这步操作的具体实现在ThreadPoolExecutor的分析中已经说过,这里不再赘述。我们直接讨论ensurePrestart方法,方法实现如下:

void ensurePrestart() {      int wc = workerCountOf(ctl.get());      if (wc < corePoolSize)          addWorker(null, true);      else if (wc == 0)          addWorker(null, false);  }  

这个方法调用了addWork方法,这个方法是ThreadPoolExecutor中的实现。主要是新建一个线程,用新建的线程执行提交的task,如果task是null,则从等待队列中获取任务进行执行。

      如上所述,delayedExecute先将task放入队列,然后在调用addWork,传入null值,从队列中获取task执行。之所以要先放入队列,然后再获取;是因为延时任务并不是马上就需要执行的,延时任务都有一个延时时间。


七、如何保证周期任务按照设定的period执行

       保证task按照设定的周期执行主要是通过Schedulefuturetask中的一个time属性实现的。这time属性会记录task下一次运行的时间,在每次运行完之后都会更新这个时间。然后,task在放入等待队列中,这个等待队列会以task的time属性进行排序,time值越小,排序越靠前。执行时依次取出,这样就达到了周期执行的目的。如这段代码:

 //类:ScheduledFutureTask中的run方法   public void run() {      boolean periodic = isPeriodic();      if (!canRunInCurrentRunState(periodic))          cancel(false);      else if (!periodic)          ScheduledFutureTask.super.run();      else if (ScheduledFutureTask.super.runAndReset()) {          setNextRunTime(); //设置下一次执行的时间          reExecutePeriodic(outerTask); //加入队列,以备下一次执行      }  }    private void setNextRunTime() {      long p = period; //根据周期设置时间      if (p > 0)          time += p;      else          time = triggerTime(-p);  }    void reExecutePeriodic(RunnableScheduledFuture<?> task) {      if (canRunInCurrentRunState(true)) {          super.getQueue().add(task); //加入队列          if (!canRunInCurrentRunState(true) && remove(task))              task.cancel(false);          else              ensurePrestart(); //又再次回到了ensurePrestart方法      }  }  

八、自定义队列DelayedWorkQueue的实现

       上面提到,执行器会将task放入等待队列,并且按照time值的大小进行排序。这里简要的说明一下这个自定义队列的实现,以便可以进一步的理解这个周期调度的实现方式。

1、DelayedWorkQueue中维护了一个数组,默认长度16。当插入的元素超过16时,自动扩容,每次扩容的长度是50%;

2、对于这个数组的增、删、改、查,采用了一种二叉树的方式进行的。如下图:

Executor执行框架源代码分析(四)——ScheduledThreadPoolExecutor


九、ScheduleThreadPoolExecutor的扩展

     默认情况下,ScheduleThreadPoolExecutor中运行的是一个futureTask类的实例。如果需要修改其运行类型,实现自定义的调度任务时,可以通过下面的形式进行扩展,而不是直接覆盖原有的FutureTask。

public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {         static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }         protected <V> RunnableScheduledFuture<V> decorateTask(                   Runnable r, RunnableScheduledFuture<V> task) {          return new CustomTask<V>(r, task); //重写decorateTask方法,实现自定义的调度任务      }         protected <V> RunnableScheduledFuture<V> decorateTask(                   Callable<V> c, RunnableScheduledFuture<V> task) {          return new CustomTask<V>(c, task);//重写decorateTask方法,实现自定义的调度任务      }      // ... add constructors, etc.  }}  

总结

       虽然ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,但其实现方式有很大的不同,主要体现在两个方面:

1、运行的目标对象不同,虽然Task都实现了Runnable接口,但是ScheduleThreadPoolExecutor会将Task包装成一个ScheduledFutureTask,再进行调度执行。

2、采用的是*队列,ThreadPoolExecutor可以自己定义队列策略,而ScheduleThreadPoolExecutor则没有这个特权,ScheduleThreadPoolExecutor只能采用*队列,那也就是说ThreadPoolExecutor中的maxmumPoolSize将不会生效。

3、ScheduleThreadPoolExecutor覆写了所有的executor和submit方法,也就是说ScheduleThreadPoolExecutor几乎已经完全剥离了ThreadPoolSize,继承的ThreadPoolSize仅仅只是充当一个线程池的角色,已经没有了执行功能。

4、不能设置allowCoreThreadTimeOut属性值为true,这回导致task没有现成去执行