从源码看JDK提供的线程池(ThreadPoolExecutor)

时间:2023-12-20 09:20:44

一丶什么是线程池

(1)博主在听到线程池三个字的时候第一个想法就是数据库连接池,回忆一下,我们在学JavaWeb的时候怎么理解数据库连接池的,数据库创建连接和关闭连接是一个比较耗费资源的事情,对于那些数量多且时间短暂的任务,会导致频繁获取和释放连接,这样使得处理事务的效率大大降低,多以我们创建一个连接池,里面放了指定数目的连接,当应用需要数据库连接的时候去里面获取,使用完毕后再放到连接池里,这样就避免了重复的获取连接和释放连接,至于要获取什么样的连接池我们可以根据应用的特征,设置参数来决定。

(2)线程池和连接池很相似,线程池的产生是为了避免重复的创建线程和回收线程。本着存在即合理,存在即有优点的理念(这个说法不普遍适用),线程池有如下三个优点:

①降低资源消耗。通过重复利用已创建的线程降低线程创建、销毁线程造成的消耗。

②提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

③提高线程的可管理性。线程是稀缺资源,如果入限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。


二丶ThreadPoolExecutor的使用

ThreadPoolExecutor是线程池的最核心的一个类,所以要了解线程池我们先来看看ThreadPoolExecutor类的实现。

本着先学开车后学修车的理念,我们先通过范例来学习一下ThreadPoolExecutor的使用(以后对JDK源码框架的学习都会本着这个原则)。


public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadPoolExecutorTest task = new ThreadPoolExecutorTest();
//corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 20, 300, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(4));
for(int i=0;i < 15;i++){
threadPool.execute(task.new MyTask(i));
}
threadPool.shutdown();
System.out.println("end");
} public class MyTask implements Runnable {
private int taskNo;
public MyTask(int taskNO){
this.taskNo = taskNO;
}
public void run(){
System.out.println("任务:"+taskNo+"正在执行");
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务:"+taskNo+"执行执行结束");
}
}
}

输出:

任务:3正在执行
任务:10正在执行
任务:1正在执行
任务:9正在执行
任务:0正在执行
任务:4正在执行
任务:2正在执行
任务:11正在执行
任务:13正在执行
任务:12正在执行
任务:14正在执行
end
任务:1执行执行结束
任务:5正在执行
任务:9执行执行结束
任务:6正在执行
任务:11执行执行结束
任务:7正在执行
任务:3执行执行结束
任务:8正在执行
任务:10执行执行结束
任务:12执行执行结束
任务:2执行执行结束
任务:13执行执行结束
任务:14执行执行结束
任务:0执行执行结束
任务:4执行执行结束
任务:6执行执行结束
任务:5执行执行结束
任务:8执行执行结束
任务:7执行执行结束

以上的范例就是ThreadPoolExecutor的简单应用,首先需要创建一个任务类MyTask ,其次在主方法里创建ThreadPoolExecutor对象,接着用for循环来模拟运行多个线程,然后execute方法执行,最后调用shutdown方法结束。

上面代码的实现和我们往常实现多线程有些区别,我们往常使用:

Thread threadA = new Thread();
thread.start();

来创建一个线程执行任务,在应用ThreadPoolExecutor时,我们不再自己创建,而是使用线程池为我们创建的线程。

在创建线程池ThreadPoolExecutor对象时,有很多个构造参数,通过注释我们可以了解到,这些参数就是用来设置线程池的特征的。


三丶从源码来看ThreadPoolExecutor

1)ThreadPoolExecutor结构:

public class ThreadPoolExecutor extends AbstractExecutorService {
...
} public abstract class AbstractExecutorService implements ExecutorService {
...
} public interface ExecutorService extends Executor{
...
} public interface Executor {
...
}
//ForkJoinPool 也继承自AbstractExecutorService
public class ForkJoinPool extends AbstractExecutorService {
...
}

2)线程池处理任务处流程:

知道了ThreadPoolExecutor的继承关系后我们来了解一下ThreadPoolExecutor的设计结构和思想,这对我们后面理解ThreadPoolExecutor的源码有很大的帮助:

从源码看JDK提供的线程池(ThreadPoolExecutor)

(对队列的判断应该在线程池内部)

上面的这张图应该就能大概的描述ThreadPoolExecutor的实现了,同时也能够理解ThreadPoolExecutor的构造参数了。

线程池在创建的时候会设置CorePoolSize,maximumPoolSize,workQueue等几个重要参数,CorePoolSize指的是核心线程池的大小,maximumPoolSize指的是线程池的线程数最大值,workQueue为线程池指定的阻塞队列。

处理流程:

  1. 主线程执行execute方法,提交任务到线程池,线程池判断核心线程池中的线程是否都在工作,如果不是则创建一个线程来执行新任务,如果都在工作,进入下一步。
  2. 判断工作队列是否已满,如果不满,则将新任务加入到阻塞队列中,如果满了进入下一步。
  3. 判断线程池中线程数是否小于maximumPoolSize,如果小于,创建新的线程来处理新任务,否则交给饱和策略。

3)源码:

接下来我们跟着方法的执行流程来跟源码:

源码从哪里开始跟?当然是从execute方法开始啦,毕竟这个是执行的开端呀(博主跟源码还是喜欢这样,这样慢慢跟下去,不太喜欢直接看构造,字段,方法,等看到了字段属性再跟下去看)。


3.1 execute:

public void execute(Runnable command) {
//判断任务有效性
if (command == null)
throw new NullPointerException();
//ctl是一个AtomicInteger类型数据
//private final AtomicInteger ctl =
//new AtomicInteger(ctlOf(RUNNING, 0));
//ctlOf方法下面是Runing代表的值和0的或操作
//private static int ctlOf(int rs, int wc)
// { return rs | wc; }
//private static final int RUNNING = -1 << COUNT_BITS;
//COUNT_BITS = Integer.SIZE - 3;
//所以这个c就是RUNNING值的句柄,额,大动干戈了...
int c = ctl.get();
//如果运行的线程数小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//如果线程池成功为command任务创建或分配新的线程
//addWorker方法boolean参数用来判断是否在核心池加任务
if (addWorker(command, true))
//退出程序
return;
//更新Runing值
c = ctl.get();
}
//private static boolean isRunning(int c)
//{return c < SHUTDOWN;}
//SHUTDOWN值为0,如果小于这个值,表示运行停止
//offer用来判断任务是否成功入队
if (isRunning(c) && workQueue.offer(command)) {
//再次获取RUNNING值
int recheck = ctl.get();
//如果command在任务队列中,remove方法将其移除
if (! isRunning(recheck) && remove(command))
//将command任务交给饱和策略
reject(command);
//如果程序遭到shutdown或shutdownNow方法停止,
//那么这时会检测到无线程运行,这个时候不要添加任务处理
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果入队失败,那么交给饱和策略
else if (!addWorker(command, false))
reject(command);
}

相信这个方法已经不要我再多作什么赘述了吧(捂嘴笑.jpg)。

这里我们再将ThreadPoolExecutor的一些字段列一下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS; // 这几个方法我也将它们看做字段了
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//总之,为了效率,源码的位运算常见的跟喝茶似的

如果没有对照源码看博文的小伙伴可能会有很多疑问,相信你打开源码再看我的方法注释会更好的理解!


3.2 addWorker:

看了上面的execute方法,相信我们可以看出来addWorker方法也算是核心了,addWorker方法担任了所有的将任务交给线程的操作:

   //addWorker方法两个参数,第一个参数不用说,第二个之前我们说过
//它是用来区分任务是将送达的地方(是否是核心线程池)
private boolean addWorker(Runnable firstTask, boolean core) {
/*说明:其实retry就是一个标记,标记程序跳出循环的时候从哪里开始执行,
*功能类似于goto。retry一般都是跟随者for循环出现,第一个retry的下面
*一行就是for循环,而且第二个retry的前面一般是 continue或是 break。*/
retry:
for (;;) {
//获取RUNNING
int c = ctl.get();
//运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; for (;;) {
int wc = workerCountOf(c);
//下面可以看出Boolean类型参数core 作用了
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//这里有个疑问,如果上面一直死循环,即使通过break跳出循环,那么根据
//Retry特性,岂不是还要执行死循环,然后往复循环?
//哦,知道了,当不满足上面的各种条件的时候
//不就不用执行上面的代码了吗,这个...思维僵化了... boolean workerStarted = false;
boolean workerAdded = false; Worker w = null;
try {
//从这里我们可以看出对线程的包装
w = new Worker(firstTask);
//这里创建了线程
final Thread t = w.thread;
//有效性判断
if (t != null) {
//这里要加锁了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers为HashSet
//是用来存放被包装过的工作线程
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//当添加成功后就要启动了
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

从上面的代码我们可以发现,当任务交给线程执行的时候并不是直接的交给线程,线程池创建线程后会将线程封装成工作线程Worker,Worker工作完后还会继续去工作队列中获取任务来执行。


3.3 Worker类:

 private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
...
}

我们可以从下面这段Worker类的Run方法中窥到这一点:

jdk1.8和1.7有很大的改动,有兴趣的朋友可以去对比一下

 public void run() {
runWorker(this);
}
   final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//下面这行是重点
//task不为空或者getTask(获取队列中任务)不为空的时候
//对这个任务加锁进行处理
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//这里运行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

3.4 submit:

既然讲到了execute方法,怎么能少了submit方法,当我们执行一个任务的时候,有的时候需要返回值,这个时候我们就需要用到submit方法了。

其实我们通过源码可以发现submit方法内部也是调用execute方法,当调用submit方法的时候我们可以收到一个Future对象,我们可以调用Future对象的get方法来获得它的返回值。

关于Future的知识,可以参考:

https://www.cnblogs.com/cz123/p/7693064.html

注:我在ThreadPoolExecutor中找submit方法的时候没找到,然后才发现ThreadPoolExecutor是直接继承他的父类AbstractExecutorService的。

 public Future<?> submit(Runnable task) {
//验证任务有效性
if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

从上面的代码我们可以看出将任务包装成一个RunnableFuture对象,然后将这个对象用execute执行。

如果看过我上面推荐的博文,读者应该知道Callable和和Runnable的区别了:是否有返回值。

关于这个Future的结构我们来理一下:

FutureTask→(实现)RunnableFuture→(继承)Runnable,Future

我们可以看看newTaskFor方法:

 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

它的底层其实就是new一个FutureTask对象而已,所以FutureTask才是真正的实现类哦!

关于FutureTask的实现,我们会专门抽出时间去整理!


3.5shutdown和shutdownNow:

我们可以通过shutdown和shutdownNow方法来关闭线程池,shutdown方法通过遍历工作线程HashSet,将运行状态(ctl)这设置为SHUTDOWN并调用interrupt方法中断所有线程,shutdownNow同样遍历所有线程,将将运行状态(ctl)这设置为STOP,并调用interrupt方法中断所有线程。

   public void shutdown() {
//需要加锁中断
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//保证线程可中断
checkShutdownAccess();
//更改运行状态,底层为原子操作
advanceRunState(SHUTDOWN);
//这个方法会对全局变量workers(HashSet)进行遍历
//对这个里的所有工作线程调用interrupt方法
interruptIdleWorkers();
//一个空方法
//官方是这样说的:
//used by ScheduledThreadPoolExecutor
//to cancel delayed tasks.
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//保证线程可中断
checkShutdownAccess();
//ctl设置为STOP
advanceRunState(STOP);
interruptWorkers();
//这个就是两个方法区别
//这个方法将工作队列中的任务(还未执行)
//取出放到list中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

3.6 RejectedExecutionHandler:

最后我们来看看这个饱和策略,当线程和队列都满了过后,表明这个线程池处于饱和的状态,那么我们必须要采取一定的措施来处理这些任务,在默认的情况下我们会执行AbortPolicy,表示无法处理任务,抛出异常。

同时JDK提供了以下的几种策略:

  • AbortPolicy:直接抛出异常
  • CallerRunsPolicy:用调用者的线程来执行任务
  • DiscardOldestPolicy:丢弃队列里最近的任务,并执行这个任务
  • DiscardPolicy:丢弃,不处理

我们也可以实现RejectedExecutionHandler接口进行自定义操作,例如有的时候我们需要将这种异常记录到日志当中,这个时候我们就需要自定义了!

总结:通过对线程池的学习,自己又了解到不少自己不知道的知识,例如Future接口等,也通过学习,发现一些容器和锁真的很常见,自己平时应用的时候很少碰见,当然,大师的编程水平也值得我们这些小民学习!