Java并发包线程池之ThreadPoolExecutor

时间:2023-03-08 16:02:24

参数详解

ExecutorService的最通用的线程池实现,ThreadPoolExecutor是一个支持通过配置一些参数达到满足不同使用场景的线程池实现,通常通过Executors的工厂方法进行配置,比如Executors.newFixedThreadPool,Executors.newSingleThreadExecutor和Executors.newCachedThreadPool都是基于ThreadPoolExecutor并通过设置不同的参数实现的,因此,ThreadPoolExecutor有很多可调配的参数(主要有下面几个):

corePoolSize 和 maximumPoolSize

即线程池中线程的数量通常也称之为线程池的大小,其中corePoolSize表示核心线程的数量,而maximumPoolSize则是线程池中允许的最大线程数。这两个参数可说是线程池ThreadPoolExecutor的核心参数,有这样的规则(一定要牢记):当向线程池提交一个任务时,如果当前线程数 小于 corePoolSize则不论那些线程是空闲还是繁忙都将立即创建新线程来处理该任务;如果线程数大于等于corePoolSize,则尝试将任务添加到任务队列中去;如果队列满了并且当前线程数 小于 maximumPoolSize,则创建新线程来执行任务;如果队列满了,线程数也到达了最大值maximumPoolSize,那么就要走线程池的拒绝策略了。这条规则其实也是ThreadPoolExecutor的实现逻辑体现。通过将corePoolSize和maximumPoolSize设置为相同的值,可以创建一个固定大小的线程池;通过将maximumPoolSize设置为一个本质上*的值例如Integer.MAX_VALUE,将允许线程池创建无限个线程(在资源耗尽之前)执行任务。

workQueue

即任务队列,它是BlockingQueue的实现类,上面提到当线程数超过corePoolSize,新提交的任务将尝试加入等待队列。不同的队列也会体现不同的线程池策略,从而适用于不同的使用场景,目前JDK在该线程池中主要用到了以下几种队列:

  1. 有界队列:例如ArrayBlockingQueue,通常与有限的maximumPoolSize结合使用,这有助于防止资源耗尽,但是队列的大小与maximumPoolSize的大小很难调控到一个有效的折衷:大队列与小型池的组合可以最大限度地降低CPU使用率、操作系统资源和线程上下文切换,但是可能会降低吞吐量。而小队列与大型池的组合则提高了CPU利用率但大量的上下文切换依然可能导致降低吞吐量。
  2. *队列:例如不指定容量的LinkedBlockingQueue,这将导致在所有 corePoolSize 线程都忙时新任务将被加入队列中等待,而不会创建多于corePoolSize数量的线程,maximumPoolSize将变得无效。*队列适用于当每个任务完全独立于其他任务,即任务执行互不影响的情况,例如web请求。如果任务的提交频率超过了任务的平均处理速率,将导致队列积压大量未处理的任务,最终系统资源耗尽,服务器宕机,因此必须要有效的控制任务的提交频率。。
  3. 0容量队列:例如SynchronousQueue, 这是一个用于传递的0容量队列,通常和本质上*的maximumPoolSize(例如Integer.MAX_VALUE)结合使用,以避免出现拒绝任务的情况,这将导致所有提交的任务都将立即创建线程执行。当任务的提交频率超过了任务的平均处理速率,将导致创建越来越多的线程以处理到达的任务,因此也有资源耗尽的潜在风险,必须要有效的控制任务的提交频率。

RejectedExecutionHandler拒绝策略

上面提到当队列满了,并且线程池中的线程数也达到了最大值maximumPoolSize,再次提交的任务怎么办?这就是拒绝策略要应对的情况,同时也是当线程池使用shutdown关闭之后再提交任务的应对策略,当出现这两种情况时,线程池将调用RejectedExecutionHandler的rejectedExecution(Runnable, ThreadPoolExecutor)方法,ThreadPoolExecutor预定义了以下四种处理策略:

  1. ThreadPoolExecutor.AbortPolicy策略,这是默认策略,将直接抛出RejectedExecutionException运行时异常。
  2. ThreadPoolExecutor.CallerRunsPolicy策略,如果线程池尚未关闭,直接用提交任务的线程来执行任务(相当于变成同步执行)。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  3. ThreadPoolExecutor.DiscardPolicy策略,直接丢弃任务。
  4. ThreadPoolExecutor.DiscardOldestPolicy策略,如果线程池尚未关闭,则丢弃工作队列最头部的任务(也就是等的最久的那个),然后重试执行(如果再次失败,则重复此过程)。

除了以上预定义的策略,也可以通过实现RejectedExecutionHandler接口自定义策略,但这样做需要非常小心,尤其是当策略仅用于特定容量或特殊任务队列时。

keepAliveTime

如果线程池中的线程数超过corePoolSize,则这些多出的线程(称之为非核心线程)将在空闲时间超过keepAliveTime时被销毁,以减少不必要的资源消耗。当然之后任务渐渐多起来的话,又会根据maximumPoolSize创建多的线程。可以通过setKeepAliveTime方法动态的改变这个参数,使用Long.MAX_VALUE这样的本质上无限长的时间数字可以取消这样的特性。默认情况下,该参数仅仅针对多于corePoolSize的非核心线程,但只要keepAliveTime非0,也可以通过allowCoreThreadTimeOut(boolean) 方法将其应用到所有的线程(即包括核心线程)。注意在构造方法中,虽然只是不允许keepAliveTime小于0,但是等于0的值也是无意义的,意味着那些非核心线程创建处理之后立即就会被销毁。

没有被引用并且没有工作线程的线程池将被自动终结(shutdown),但通常由于keepAliveTime只应用于非核心线程,所以就算不再有任何程序引用线程池,其工作线程的数量也一般不会为0,从而无法自动被关闭,因此如果希望确保没有被引用的线程池即使在使用者忘记了调用shutdown()的情况下也能自动关闭的话,那么必须通过设置适当的keepAliveTime,0个核心线程数的下限(即corePoolSize为0)或者使用allowCoreThreadTimeOut将keepAliveTime应用到核心线程,但这种办法也有一定的问题,那就是当任务队列从空变成非空时可能将没有任何线程可以用来处理这些任务而不得不立即创建线程,这又与线程池的设计目的背道而驰。

threadFactory

线程工厂,即提供创建线程功能的线程工厂,没指定的时候默认使用Executors.defaultThreadFactory()在相同的线程组中创建具有相同的优先级(Thread.NORM_PRIORITY)和非守护进程状态的线程。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。注意:如果ThreadFactory调用newThread返回null时则创建线程失败,线程池将继续执行,但可能无法执行任何任务。

ThreadPoolExecutor的线程池声明周期

以上参数决定了线程池ThreadPoolExecutor的整个运作机制。ThreadPoolExecutor还对线程池的声明周期进行了管理,该线程池主要有五种状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED:

RUNNING:处于RUNNING状态的线程池能够接受新任务,并且会处理进入队列排队的任务。

SHUTDOWN:处于SHUTDOWN状态的线程池不再接收新任务,但是会处理已经进入队列排队的任务。

STOP:处于STOP状态的线程池不接收新任务,不处理已经进入队列排队的任务,并且还会中断正在处理的任务。

TIDYING:当所有(包括队列中的)的任务都已经终止(或处理结束),线程池的工作线程数为0时,线程池的状态就会过渡到TIDYING状态,此状态下的线程池会调用钩子函数terminated()。

TERMINATED:当TIDYING状态下的线程池执行钩子函数terminated()结束,线程池的状态就转变为TERMINATED,表示线程池彻底终止。

Java并发包线程池之ThreadPoolExecutor

线程池的钩子函数

ThreadPoolExecutor提供了四个可以被重写的钩子方法beforeExecute(Thread, Runnable) , afterExecute(Runnable, Throwable) ,onShutdown以及terminated() 方法,前两个方法分别会在执行每个任务之前和之后调用,它们可用于操纵执行环境,例如,重新初始化 ThreadLocal、搜集统计信息或添加日志信息。onShutdown会在执行shutdown将线程池状态更新成SHUTDOWN之后调用,而terminated()方法根据上面的声明周期,其会在线程池转换到TERMINATED状态之前调用,可用于在线程池彻底终结之前完成一些特殊的处理,例如回收资源。需要注意的是,如果这些钩子函数抛出了未被捕获的异常将导致线程池的工作线程处理任务失败并消亡,因此在重写的时候一定要将异常捕获。

任务队列监控与维护

ThreadPoolExecutor提供了getQueue()方法,允许出于监控和调试的目的而访问工作队列,并强烈反对将其用于其它目的。remove(Runnable) 和 purge() 这两种方法可用于在取消大量已排队任务时帮助进行存储回收。

源码分析

在分析源码之前,有必要先知道,ThreadPoolExecutor内部使用了一个AtomicInteger类型的原子变量ctl来控制线程池的状态,在实现的时候,ThreadPoolExecutor将该变量分成了两部分来使用,32位中的高3位用于存储线程池的五种状态即runState,其余的低29位存储线程池中工作线程的数量即workerCount,所以ThreadPoolExecutor线程池最多可以创建的工作线程的数量是2^29)-1(大约5亿)个线程。

 //状态控制字段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //29,移位辅助
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //低29位全是1,理论上可以创建的线程的最大数量 // runState 即线程池状态存储在ctl高3位
private static final int RUNNING = -1 << COUNT_BITS; //对应的高3位值是111。 -536870912
private static final int SHUTDOWN = 0 << COUNT_BITS; //对应的高3位值是000。 0
private static final int STOP = 1 << COUNT_BITS; //对应的高3位值是001。 536870912
private static final int TIDYING = 2 << COUNT_BITS; //对应的高3位值是010。 1073741824
private static final int TERMINATED = 3 << COUNT_BITS; //对应的高3位值是011。 1610612736 //ctl的装箱与拆箱
private static int runStateOf(int c) { return c & ~CAPACITY; } //得到运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //得到工作线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) {
return c < s;
} private static boolean runStateAtLeast(int c, int s) {
return c >= s;
} private static boolean isRunning(int c) {
return c < SHUTDOWN;
} /**
* CAS工作线程数加1
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
} /**
* CAS工作线程数减1
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
} /**
* 工作线程数减1,这只在线程异常结束时调用。
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

由代码可见,高3位表示的线程池的状态是从小到大的:RUNNING < SHUTDOWN(等于0)  < STOP  < TIDYING  < TERMINATED。并且可以通过位运算对runState和workerCount进行存取操作。

然后是一些成员变量,

 //任务队列
private final BlockingQueue<Runnable> workQueue; //可重入锁
private final ReentrantLock mainLock = new ReentrantLock(); //工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>(); //条件等待
private final Condition termination = mainLock.newCondition(); //记录最大工作线程数的最高峰值,即workers.size()的最大值
private int largestPoolSize;
//在线程终止是记录所有线程完成的任务总数
private long completedTaskCount; //下面是一些用户可控制的参数,用volatile修饰可以保证最新 private volatile ThreadFactory threadFactory;//线程工厂 private volatile RejectedExecutionHandler handler;//拒绝策略处理器 private volatile long keepAliveTime;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile boolean allowCoreThreadTimeOut; private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy(); //默认拒绝策略处理器 //调用shutdown和shutdownNow的许可证。
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); private final AccessControlContext acc; //线程池终结者的上下文

大部分的参数都在上面的可控参数中说过了,其中最值得注意的是两个集合workQueue和HashSet<Worker> workers,前者保存无法被核心线程立即处理的任务,后者其实就保存创建的每一个工作线程。另外ThreadPoolExecutor使用了可重入锁ReentrantLock来进行内部的状态实现。另一个重要的内部类Worker:

 //线程包装类,因此叫者工作者,该类持有创建的线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
//该类不会被序列号,仅仅是为了消除警告
private static final long serialVersionUID = 6138294804551838833L; final Thread thread; //正在执行此任务的线程,如果线程创建失败则为null //要运行的初始任务。可能是null,为null时表示从任务队列中获取任务,不为null时先执行该任务,再从任务队列取任务执行
Runnable firstTask; volatile long completedTasks; //每个线程完成的任务计数器 //使用ThreadFactory为给定的第一个任务创建线程。
Worker(Runnable firstTask) {
setState(-1); // 防止在线程没有真正启动之前被中断,直到运行runWorker时state才会变成0
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //创建执行该任务的新线程
} /** 调用runWorker去提取任务执行 */
public void run() {
runWorker(this);
} // 下面是AQS的锁实现方法
//
// 值0表示锁释放状态
// 值1表示锁定状态。
protected boolean isHeldExclusively() {
return getState() != 0;
} protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
} protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
} public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); } //中断当前线程仅当线程启动之后
void interruptIfStarted() {
Thread t;
//state >=0 表示线程已经启动,该值在Worker被实例化之后为-1,直到运行runWorker时state才会变成0
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

内部类Worker主要用于维护运行任务的线程的中断控制状态,以及其他次要的记帐功能,它实现了Runnable接口,它的run方法才是线程真正的运行时代码块,当线程运行时,由它的run方法调用runWorker去调度执行提交的任务。该类实现了AQS,以简化获取和释放执行每一个任务时的锁,但是它被实现成了一个不可重入锁,因为我们不希望工作任务在调用诸如setCorePoolSize之类的线程池控制方法时能够重新获得锁。Worker持有一个firstTask,它表示线程执行时需要执行的非任务队列中的任务,例如核心线程被创建出来之后执行的任务就没有进入队列。

构造方法

 //使用给定的初始参数创建一个新的ThreadPoolExecutor。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || //corePoolSize不能小于0
maximumPoolSize <= 0 || //maximumPoolSize不能小于等于0
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) //keepAliveTime不能小于0
throw new IllegalArgumentException(); //任务队列,线程工厂,拒绝策略处理器都不能为null
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException(); this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext(); this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

ThreadPoolExecutor提供了四个构造方法,这里只列举了参数最齐全的一个,其余的构造方法仅仅是将线程工厂和拒绝策略处理类设置成默认的Executors.defaultThreadFactory()和AbortPolicy策略。可见,构造方法其实就是针对corePoolSize,maximumPoolSize,keepAliveTime,workQueue,threadFactory,RejectedExecutionHandler这些上面介绍的参数进行设置。值得注意的是,corePoolSize和keepAliveTime可以为0,这为核心线程的自动回收提供了支持(见上面参数介绍)。另外可以看到创建线程池实例的时候并没有立即创建线程,这里使用了按需创建线程的策略,即只在新任务被提交过来时才会创建线程。当然,你可以调用prestartCoreThread或者prestartAllCoreThreads在任务被提交执行之前,预先启动一个或所有核心线程待命。

任务提交 --- execute(Runnable)

通过线程池概述章节,我们知道ThreadPoolExecutor继承了AbstractExecutorService,其任务提交方法submit、invokeAny和invokeAll方法都依赖未实现的execute(Runnable command)接口方法去提交任务,因此execute方法就是ThreadPoolExecutor的核心方法。

 //提交任务的最基本实现方法,提交的任务可能由新线程执行,也可能由线程池中已经存在的线程执行。
//如果因为线程池已经关闭或者工作量已经饱和导致任务无法被提交,将执行拒绝策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 分三步进行:
*
* 1. 如果运行的线程数小于corePoolSize,则尝试用给定的任务作为第一个任务启动一个新线程。
*
* 2. 任务入队,成功之后要进行double-check,发现线程池关闭则走拒绝策略,发现工作线程都被回收了,立即创建非核心线程执行队列任务。
*
* 3. 入队失败(队列满了),尝试创建新线程执行该任务。如果失败表示线程池关闭或者工作量饱和了,走拒绝策略
*/
int c = ctl.get();
//线程数小于corePoolSize 新建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return; //成功返回
c = ctl.get();//核心线程数已经饱和,重新读取状态进入下一步尝试加入任务队列
}
//尝试将任务加入等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//入队之后再次确认状态,即double-check
if (! isRunning(recheck) && remove(command)) //线程池关闭了,拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0) //工作线程数为0,新建非核心线程执行任务队列
addWorker(null, false);
}
//队列满,尝试创建新线程执行该任务。如果失败表示线程池关闭或者工作量饱和了,走拒绝策略
else if (!addWorker(command, false))
reject(command);
}

execute的逻辑比较简单,其逻辑基本上就是上面红字所阐述的规则:①工作线程数小于corePoolSize,则尝试用给定的任务作为其第一个任务创建一个新的核心线程,成功返回,失败说明核心线程已经饱和则重新读取状态之后进入下一步。②否则尝试将任务加入任务队列workQueue,入队成功之后需要进行状态二次确认(double-check),发现线程池关闭了,走拒绝策略,发现所有线程都被回收了,创建一个非核心线程执行任务队列,否则就在任务队列老实待着等待被其他线程调度执行。③入队失败,表示队列满了,则尝试创建新的非核心线程执行该任务,如果失败表示线程池关闭或者工作量饱和了,走拒绝策略。

其中第二步中发现线程数为0之后,创建的是非核心线程去队列中获取任务执行,是因为任务已经入队了,创建核心线程的话则需要重新把任务从队列中取走,这增加了开销。第三步中其实在线程池关闭、线程池工作线程的数量大于maximumPoolSize的时候也会调用addWorker,所以该方法里面肯定会做相应的检查。

从execute的逻辑可见,其创建执行任务或任务队列的线程逻辑都由addWorker方法实现,addWorker才是真正的关键,并且execute调用addWorker时,如果firstTask==null表示已经进入了任务队列,否则表示还没有进入任务队列,可以看着是一个新任务:

 //根据当前线程池的状态和线程数量的限制决定是否可以创建新的线程,并增加线程数计数,并执行第一个任务。
//如果线程池已经关闭,或即将关闭,或线程工厂创建线程失败,此返回返回false。
//如果线程工厂创建线程返回null,或者出现异常,例如线程启动内存溢出,需要进行回滚
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { //自旋,决定是否需要创建新线程
int c = ctl.get();
int rs = runStateOf(c); //仅在线程池处于SHUTDOWN,并且打算入队新任务才对队列进行检测
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && //STOP状态不接受任何任务,
firstTask == null && //SHUTDOWN状态但是不接受新任务
! workQueue.isEmpty())) //SHUTDOWN状态但是队列为空
return false; //不需要创建线程,返回false for (;;) {//自旋
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false; //达到线程数限制了,返回false
if (compareAndIncrementWorkerCount(c))
break retry; //增加线程数,跳出最外层循环
c = ctl.get(); // 有其它线程提交任务增加了线程,重读状态
if (runStateOf(c) != rs)
continue retry; //状态有变化,重试
// else CAS failed due to workerCount change; retry inner loop
}
} //到这里,说明可以创建新线程,并且已经增加了线程计数
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //用给定的第一个任务创建新线程实例
final Thread t = w.thread;
if (t != null) { //创建线程成功
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加可重入锁
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if shut down before lock acquired.
int rs = runStateOf(ctl.get()); //线程池处于RUNNING状态,或者是已经入队的SHUTDOWN状态的线程池
//只有在这种情况才加新创建的线程加入工作线程集合HashSet<Worker> workers
if (rs < SHUTDOWN || //线程池处于RUNNING状态
(rs == SHUTDOWN && firstTask == null)) { //SHUTDOWN状态但任务已经入队
if (t.isAlive()) // 检测该线程是否处于可运行状态
throw new IllegalThreadStateException();
workers.add(w); // 加入工作线程集合workers
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //largestPoolSize记录工作线程的最大数量
workerAdded = true;
}
} finally {
mainLock.unlock();
} //工作线程已经入队workers,启动线程
if (workerAdded) {
t.start(); //启动创建的线程
workerStarted = true;
}
}
} finally {
//不论线程有没有创建成功,只要线程没能成功启动都需要进行清理
if (! workerStarted)
addWorkerFailed(w); //创建工作线程失败清理
}
return workerStarted;
} //回滚创建的工作线程,已经增加的工作线程数减1,
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); //从工作线程集合中移除工作线程
decrementWorkerCount(); //工作线程数减1
tryTerminate(); //尝试线程池终结
} finally {
mainLock.unlock();
}
}

addWorker的逻辑也很简单,它主要分两个部分:一、根据线程池状态决定是否需要创建线程,如果需要增加ctl中记录的工作线程workerCount的数量;二、如果需要创建线程,则创建线程,加入工作线程集合workers,并启动线程。只有线程成功启动才会返回true。

第一部分中,首先是状态检查,如果rs!=SHUTDOWN 则说明是STOP这种状态的线程池不处理任何任务,或者rs==SHUTDOWN但firstTask != null说明是还没入队的新任务但这种状态的线程池不接受新任务,或者是SHUTDOWN, firstTask == null但是队列为空则说明只是为了创建一个线程去处理队列中的任务但此时队列已经空了也没必要创建线程了,所以这三种情况都立即返回false。然后是线程池中已经存在的线程数限制校验。线程数超过了允许的最大容量,直接返回false。然后是工作线程数的限制校验,超过了最大限制CAPACITY或者maximumPoolSize(非核心线程)或者corePoolSize(核心线程)也直接返回false。

第二部分中,只有在线程池处于RUNNING状态(接收新任务,也会处理任务队列workQueue中的线程),或者SHUTDOWN状态(不接受新任务,但是会处理已经在任务队列中的任务)但是任务已经入队(firstTask == null)的情况下才创建线程,并将线程加入工作线程集合workers,然后启动。

execute, addWorker方法在运行时都没有捕获异常,所有的异常都将直接抛给提交任务的调用者。

addWorker方法总结起来就是看要不要创建线程,如果要创建线程则创建并放入集合workers,然后启动线程。在finally中,还会进行清理,例如在线程没有成功启动,需要回退已经加1的线程计数器,还要从线程集合中移除该线程引用,最后还会使用tryTerminate,使线程池在需要关闭的时候自动转换至TERMINATED状态,tryTerminate后面再分析。

任务执行---runWorker(Worker)

任务的提交过程中通过addWorker创建并启动了线程,其线程的运行期代码块是实现了Runnable的内部类Worker的run方法,该方法调用了runWorker(Worker)方法来真正调度执行任务:

 //工作线程启动之后,调用的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 将worker对象的锁的state从-1更新成0,标识线程已经开始执行任务,可以被中断了
boolean completedAbruptly = true; //记录任务是否是异常结束
try {
//如果firstTask不为null,就调用getTask去任务队列中的任务,直到没有任务可处理
while (task != null || (task = getTask()) != null) {
w.lock(); //不可重入锁
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt //在线程池状态大于等于STOP的情况下确保线程被中断,否则清除掉线程的中断标记
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && //清除中断标识
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //中断该线程,将使阻塞take、poll方法抛出中断异常 //开始准备执行任务
try {
beforeExecute(wt, task); //执行前置钩子函数,异常将导致任务不会执行
Throwable thrown = null;
try {
task.run(); //执行任务
} catch (RuntimeException x) { //记录异常并抛出异常
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //执行后置钩子函数
}
} finally {
task = null;
w.completedTasks++; //线程完成的任务数加1,不论是否发生异常
w.unlock();
}
}
completedAbruptly = false; //线程是正常结束
} finally {
processWorkerExit(w, completedAbruptly); //处理工作线程的退出
}
} //执行无限期或者超时等待一个任务,或者如果出现如下四种情况都都将返回null,使该工作线程结束: // 1. 工作线程数超过了maximumPoolSize(由于调用setMaximumPoolSize改变了该值)
// 2. 线程池已经STOP,不接受执行任何任务
// 3. 线程池SHUTDOWN,并且任务队列为空
// 4. 该工作线程等待任务超时,并且该工作线程有需要被终止的可能(即根据keepAliveTime策略,allowCoreThreadTimeOut为真,或workerCount > corePoolSize 时线程的存活期满了),如果任务队列不为空,那么当前工作线程不是线程池中的最后一个线程。 //从队列中获取任务,该方法返回null将导致线程结束
private Runnable getTask() {
boolean timedOut = false; // 记录上一次poll是否超时 for (;;) { //自旋
int c = ctl.get();
int rs = runStateOf(c); // 如果已经STOP(不处理任何任务),或者是SHUTDOWN(不接受新任务但会处理任务队列中的任务)但是队列为空(队列中没有可处理的任务),则直接返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); //工作线程数减1
return null;
} int wc = workerCountOf(c); // Are workers subject to culling? 若满足线程存活限制,该工作线程需要被淘汰
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //工作线程存在存活时间限制 if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//满足上面说的那几种情况,表示线程需要被终止,先将工作线程数减1,在返回null
if (compareAndDecrementWorkerCount(c))
return null;
continue; //这里如果CAS修改线程数减1的操作失败,则表示已经有其他线程改变了工作线程的数量,当前线程可能又不需要终止了,所以需要重试。
} try {
//若允许线程存活超时,使用超时版本的poll,否则使用永久等待的take获取队列中的任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; //poll超时返回,记录超时
} catch (InterruptedException retry) {
timedOut = false;
}
}
} //处理工作线程的退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 线程异常结束,工作线程数减1
decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//在线程终结之前,将完成的任务数累加到completedTaskCount成员变量上
completedTaskCount += w.completedTasks;
workers.remove(w); //移除该工作线程
} finally {
mainLock.unlock();
} tryTerminate(); //尝试终结线程池 int c = ctl.get();
if (runStateLessThan(c, STOP)) { //线程池状态还未到达STOP
if (!completedAbruptly) { //线程是正常结束
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //允许长期存活的最小线程数
if (min == 0 && ! workQueue.isEmpty()) //任务队列不为空,至少保留一个线程
min = 1;
if (workerCountOf(c) >= min) //当前工作线程的数量不少于了允许的最少线程数,
return; // 不需要创建新线程
}
addWorker(null, false); //新建非核心工作线程
}
}

runWorker的实现,就是如果firstTask不为空则先执行它,然后再不断从任务队列中取出任务执行。

runWorker方法里面有两个地方容易产生疑惑,其一是在开始执行任务之前调用了worker对象的解锁方法w.unlock()这里是为了将state复位到0,让线程可以在这之后被中断,因为创建Worker实例的构造方法中将state的初始值设为了-1,而中断线程的方法interruptWokers()方法执行的interruptIfStarted()方法只有在state>=0的时候才能中断线程。

其二是在while循环中开始执行任务之前有中断操作,这里是为了在线程池状态大于等于STOP的情况下确保线程被中断,否则清除掉线程的中断标记(通过Thread.interrupted方法),但在清除掉中断标记之后还要进行二次线程池状态检查Double-check,如果这时候发现线程池的状态大于等于STOP又要重新确保线程被中断,真是煞费苦心的样子。

在任务执行前后分别会调用钩子函数beforeExecute,afterExecute。值得注意的是,beforeExecute如果有异常将导致真正的任务不会被执行,afterExecute也不会被执行;若beforeExecute没有抛出异常,则不论任务执行过程中是否有抛出异常,afterExecute都会被执行。值得一提的是,ThreadPoolExecutor继承的AbstractExecutorService类中将提交的任务都封装成了FutureTask,而根据线程池概述,FutureTask在执行Runnable、Callable任务时将其可能抛出的异常都捕获了,因此ThreadPoolExecutor这里的task.run并不会抛出任何异常,因此只要beforeExecute,afterExecute没有抛出异常,completedAbruptly始终是false。

getTask方法用于从任务队列中获取任务,若它返回null将导致runWorker退出while循环,从而使工作线程运行结束被回收,getTask会根据线程池的状态以及keepAliveTime指示的线程存活时间限制的规则在适当的时候返回null,从而使线程退出,线程如果有存活超时限制,会使用超时版本的poll方法从任务队列中获取任务,否则使用take方法无限期的等待任务使线程永久存活。这里这种take无限期阻塞等待任务的行为就是线程可以一直重用执行任务队列的主要原因。

在线程退出之前,会执行processWorkerExit方法为即将终结的线程做清理和完成的任务记录。如果任务异常结束,或者当前线程池中的线程数 小于线程池允许长期存活的线程数,或者任务队列不为空但工作线程数为0这三种情况下,都将可能会创建新的非核心线程补充进线程池。线程终结之前会将其完成的任务(包括异常结束的任务,真正由于beforeExecute异常导致根本没有被执行的任务也算)汇总到completedTaskCount变量上去。并且该方法还会调用tryTerminate尝试在适当的情况下自动终结线程池。

线程池的终结---shutdown

 //启动有序关闭,先执行完队列中的任务,但不接收新任务,如果已经关闭,调用不会产生额外的效果。
//此方法不会等待以前提交的任务完成执行。使用awaitTermination来做这件事。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //将线程池的状态转换到TIDYING、SHUTDOWN
interruptIdleWorkers(); //中断所有空闲的线程
onShutdown(); // 钩子函数,主要被ScheduledThreadPoolExecutor使用
} finally {
mainLock.unlock();
}
tryTerminate(); //线程池转换到TERMINATED状态。
} //如果线程池处于SHUTDOWN并且线程集合、任务队列为空,或者STOP状态并且线程集合为空时转换线程池到TERMINATED状态。
//如果满足关闭的条件,但是workerCount记录的工作线程数不为0,则需要中断一个空闲的工作线程以传播线程池的TERMINATED状态。
//必须在任何可能导致终止线程池的操作完成之后调用此方法,例如减少工作线程数量workerCount,或者在shutdown过程中从任务队列中移除任务。
//此方法不是private的,它允许被ScheduledThreadPoolExecutor调用。
final void tryTerminate() {
for (;;) { //自旋
int c = ctl.get();
if (isRunning(c) || //处于运行状态
runStateAtLeast(c, TIDYING) || //至少是TIDYING状态,即已经showdown
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //是SHUTDOWN,但是任务队列不为空
return; //不需要结束线程池 //到这里说明,线程池的状态是STOP或者是SHUTDOWN但任务队列为空
if (workerCountOf(c) != 0) { // 但工作线程数不为0
interruptIdleWorkers(ONLY_ONE); //中断一个空闲线程,传播TERMINATED状态,即工作线程从take、poll阻塞操作中退出,它们又会来调用tryTerminate
return;
} //到这里说明,线程池的状态是STOP或者是SHUTDOWN但任务队列为空,并且工作线程数为0(即所有工作线程都结束了)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //转换到TIDYING状态
try {
terminated(); //调用状态转换成TERMINATED之前的钩子函数
} finally {
ctl.set(ctlOf(TERMINATED, 0)); //不论如何,转换到TERMINATED状态
termination.signalAll(); //唤醒通过awaitTermination等待所有任务都结束的线程
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
} //中断正在等待任务的空闲线程,让它们从take、poll阻塞操作中退出,然后发现线程池状态的变化,或者配置信息的变化。
//忽略SecurityExceptions异常(在这种情况下,一些线程可能并不会被中断) //onlyOne为true,最多只中断一个工作线程。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//这里的空闲状态通过获取不可重入锁来判断,获得了该不可重入锁表示线程处于空闲
if (!t.isInterrupted() && w.tryLock()) {//如果该线程未被中断,并且处于空闲状态
try {
t.interrupt(); //中断线程
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdown的逻辑比较清晰,就是先将线程池的状态转到SHUTDOWN,将所有空闲线程中断退出,执行钩子函数onShutdown,如果这些都没抛异常,则执行tryTerminate试着看能不能进一步将线程池的状态转到TIDYING、TERMINATED。

由此可见,shudown除了将线程池的状态改变到SHUTDOWN,还会在适当的情况下,通过tryTerminate使线程池走向终结。而tryTerminate只会在①线程池是STOP状态并且工作线程数为0,或者是②SHUTDOWN并且任务队列为空工作线程数为0,这两种情况下将线程池先转到TIDYING状态,然后调用钩子函数terminated,并且不论terminated有没有抛出异常都会在其结束后,将线程池转到TERMINATED状态。

值得注意的是,当发现除了工作线程不为0,其他情况都满足需要将线程池终结的情况时,会一个一个的中断那些线程,使它们从等待任务的take、poll操作中返回,然后发现线程池的状态从而退出。

线程池的终结---shutdownNow

 //尝试停止所有真正执行的任务,以及等待被执行的任务。并返回等待被执行的任务列表。从该方法返回时,将从任务队列中删除这些等待执行的任务。
//此方法不会等待以前提交的任务完成执行。使用awaitTermination来做这件事。
//只能尽最大努力停止正在执行的任务,不能保证一定会成功。因为实现通过Thread.interrupt取消任务,任务如果不能响应中断则将不会被终止。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); //将线程池的状态转到STOP
interruptWorkers(); //中断所有的工作线程
tasks = drainQueue(); //拿走还没被执行的任务
} finally {
mainLock.unlock();
}
tryTerminate(); //尝试将线程池转换到TIDYING、TERMINATED状态。
return tasks;
} //将任务队列的任务移除转移到新的集合
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) { //drainTo失败(比如延迟队列或其他实现),只能一个一个转移了
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

shutdownNow比shutdown更激进一点,它直接将线程池的状态转到STOP, 并且它会尝试通过Thread.interrupt中断那些正在执行的任务,但若任务的执行过程不能响应中断操作(即没有可以抛出中断异常的方法)那么任务就根本中断不了,所以能不能中断正在执行的任务还得看程序怎么写。同时还会将任务队列中还没被取走执行的等待任务从任务队列中清除,并返回给shutdownNow的调用者,告知它有哪些任务没有被执行。

shutdown、shutdownNow在完成各自的任务之后都会执行tryTerminate试着看能不能进一步将线程池的状态转到TIDYING、TERMINATED。tryTerminate在shutdown中已经分析,这里就不再重复了。

等待线程池被真正终结---awaitTermination

 public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) { //自旋
if (runStateAtLeast(ctl.get(), TERMINATED)) //线程池状态已经是TERMINATED 返回ture
return true; //否则表示线程池处于运行状态
if (nanos <= 0)
return false; //不允许超时时间小于等于0,否则立即返回false
nanos = termination.awaitNanos(nanos); //等待线程池终结被唤醒
}
} finally {
mainLock.unlock();
}
}

前面的shutdown、shutdownNow在tryTerminate都只会尝试将线程池终结,但是如果发现有工作线程还存活,则只会只中断一个工作线程就立即返回了(然后由那些退出的线程在回调执行tryTerminate去中断其他工作线程,像接力一样直到将所有工作线程中断),而不会等所有工作线程都结束然后线程池状态转到TERMINATED,而awaitTermination则相当于在tryTerminate中设置了一种通知,当所有工作线程都结束了并且线程池状态转到TERMINATED的时候,通过Condition的唤醒机制通知调用awaitTermination以等待线程池被真正终结(线程池被设置成TERMINATED状态)的线程。这时候其实也可以看作是所有的工作线程都允许结束的时刻。

因此awaitTermination就是在给定的超时时间之内等待所有工作线程都结束(包括异常结束),线程池的状态已经是TERMINATED的时刻。当然超时时间不能小于等于0,不能无限期的等待。

其他方法

getCompletedTaskCount() 返回线程池已经完成的任务数估计值,包括由于beforeExecute异常导致根本没有被执行的任务和任务执行抛出异常的异常任务。该值是一个瞬态值。

getTaskCount() 返回线程池已经完成的任务数(getCompletedTaskCount的返回值) + 正在执行的任务数 + 任务队列中未被执行的任务数的总和。

getLargestPoolSize() 其实就是返回largestPoolSize,该值表示线程池运行期间同时存在的线程数的最高峰值,即在那一刻线程池中存在的工作线程的数量是任何时刻最多的时候。

getActiveCount() 返回当前时刻正在执行任务的线程数,该值也是一个瞬态值。

getPoolSize() 当前时刻线程池的工作线程集合的大小,即当前时刻线程池拥有的工作线程数量,包括核心线程非核心线程,空闲的,繁忙的。如果线程池状态是TIDYING或TERMINATED,该方法返回0.

purge() 尝试从任务队列中清除掉已经取消(通过Future.cancel() )的任务,可以用与当有大量任务取消时,快速释放任务队列之用。因为被取消的任务一直存在于任务队列中直到工作线程将其从队列中取出来准备执行的时候才会判断其是否已经取消,如果是则不执行,继续取下一个任务执行。但是如果存在其他线程干扰的时候,该方法可能会失败。

remove(Runnable task) 从任务队列中移除该任务,如果存在的话;如果该任务还没被执行,则就不会执行了。该方法有一定的局限性,因为线程池在内部对任务进行了封装,比如这里其实将Runnable 的任务封装成了FutureTask,因此再使用该方法的话,任务队列中存放的是封装过的FutureTask,而不是原生的Runnable ,根本找不到对应的任务,因此也不会成功。

getQueue() 该方法将返回线程池内部的任务队列的引用,该方法仅用于监控和调试。不推荐用于其他目的。

getKeepAliveTime(TimeUnit) 按指定的单位返回keepAliveTime的值,其意义见上面的参数解释。

setKeepAliveTime(...)  修改通过构造方法设置的keepAliveTime的值,不允许小于等于0,其意义见上面的参数解释。

getMaximumPoolSize() 返回maximumPoolSize,其意义见上面的参数解释。

setMaximumPoolSize(...) 修改通过构造方法设置的maximumPoolSize的值,其意义见上面的参数解释。

allowCoreThreadTimeOut(boolean) 设置是否将keepAliveTime应用到核心线程,其意义见上面的参数解释。

prestartAllCoreThreads() 用于在线程池建立之后,任务还没到来之前,预启动所有核心线程,使它们处于空闲等待任务的状态,返回创建的核心线程数量,见上面的解释。

prestartCoreThread(), 用于在线程池建立之后,任务还没到来之前,预启动一个核心线程。使其处于空闲状态等待任务的提交,如果所有核心线程都已启动,该方法将返回false。

ensurePrestart() 语义与prestartCoreThread一样,不同的是在即使corePoolSize为0,也至少启动了一个线程。

getCorePoolSize() 返回corePoolSize的值,其意义见上面的参数解释。

setCorePoolSize(...) 修改通过构造方法设置的corePoolSize的值,如果新值小于当前值,则多余的线程将在下次空闲时终止。其意义见上面的参数解释。

getRejectedExecutionHandler、setRejectedExecutionHandler针对拒绝策略RejectedExecutionHandler的getter,settrt方法。

getThreadFactory、setThreadFactory针对拒绝策略线程工厂threadFactory的getter,settrt方法。

finalize() 该方法仅仅是调用了shutdown,参加shutdown方法。

isTerminated() 线程池状态为TERMINATED返回true,isTerminating() 线程池状态处于SHUTDOWN、STOP 、TIDYING 这三种状态时返回true。isShutdown() 线程池状态是SHUTDOWN、STOP 、TIDYING、TERMINATED时返回true。

总结

ThreadPoolExecutor是最基础的线程池实现,它可以根据构造方法或者实例方法动态的对线程池的参数进行调整,影响线程池的最重要的三个参数是corePoolSize,maximumPoolSize,和用于存放任务队列的workQueue,三者之间的关系也体现了该线程池的实现特征:当向线程池提交一个任务时,如果当前线程数 小于 corePoolSize则不论那些线程是空闲还是繁忙都将立即创建新线程来处理该任务;如果线程数大于等于corePoolSize,则尝试将任务添加到任务队列中去;如果队列满了并且当前线程数 小于 maximumPoolSize,则创建新线程来执行任务;如果队列满了,线程数也到达了最大值maximumPoolSize,那么就要走线程池的拒绝策略了。 其中的拒绝策略就是另一个参数了RejectedExecutionHandler,它指定了当任务不能被执行时,采取的处置方式。keepAliveTime参数则是为了有效的减少线程池在业务量较少时减少不必要的开销而设计用来使线程自动被回收的,keepAliveTime和corePoolSize的配合还可以让那些用户忘记了执行shutdown的线程池自动销毁。

线程池的实现使用一个构造方法传人的阻塞队列维护来不及处理的任务,又用了一个Set集合维护当前所有的工作线程,通过工作线程使用阻塞队列的take,poll方法达到线程可以一直等待任务而不退出或者定时等待以达到使线程在空闲超过KeepAliveTime时间后退出销毁的目得。

总的来说,ThreadPoolExecutor的实现让人理解有困难的地方就是状态的转换,但对外部使用者来说,一个线程池只要在使用完之后调用shutdown或shutdownNow来延迟或者立即关闭线程池,使线程池处于SHUTDOWN或者STOP状态就足够了,后续的线程池销毁过程完全将由线程池自发完成,不需要使用者关心。