并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)

时间:2022-08-26 20:10:34

前两篇文章讲了线程池的源码分析,再来看这篇文章就比较简单了, 本文主要讲解 Executors 这个工具类,看看长江创建线程池的几种方法。

newFixedThreadPool

  • 生成一个固定大小的线程池:
 public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

最大线程数设置为与核心线程数相等,则不会创建临时线程,创建的线程都是核心线程,线程也不会被回收。此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,*队列,所以FixedThreadPool永远不会拒绝, 即饱和策略失效。

过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。

newSingleThreadExecutor

  • 生成只有一个线程的固定线程池
 public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

这个更简单,和上面的一样,只要设置线程数为 1 就可以了。

初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。

由于使用了*队列, 所以SingleThreadPool永远不会拒绝, 即饱和策略失效。

由于newFixedThreadPool和SingleThreadPool都是使用的LinkedBlockingQueue,并且核心线程固定,如果此时并发有大量任务进行添加,线程处理速度过慢,将会全部添加到LinkedBlockingQueue中,此时会出现内存溢出。

newCachedThreadPool

  • 生成一个需要的时候就创建新的线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue,所以创建的线程都是临时线程,都可以被回收。

这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。

 int c = ctl.get();
// corePoolSize 为 0,所以不会进到这个 if 分支
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// offer 如果有空闲线程刚好可以接收此任务,那么返回 true,否则返回 false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);

过程分析:我把 execute 方法的主体粘贴过来,让大家看得明白些。鉴于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中,由于采用了 SynchronousQueue,所以如果是第一个任务提交的时候,offer 方法肯定会返回 false,因为此时没有任何 worker 对这个任务进行接收,那么将进入到最后一个分支来创建第一个 worker,第一个worker执行完后就getTask()从队列中取任务。之后再提交任务的话,取决于是否有空闲下来的线程对任务进行接收,如果有,会进入到第二个 if 语句块中把当前任务给正在等待的worker,如果没有空闲的线程在等待取任务,就是和第一个任务一样,进到最后的 else if 分支创建worker。

我们来仔细分析下代码,第一次添加任务时,执行到第9行 workQueue.offer(command),我把以前文章里面的offer()代码贴过来,如果有感兴趣的可以去看看《并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析

 public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
 /**
* Puts or takes an item.
*/
Object transfer(Object e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed
boolean isData = (e != null); for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
//说明还没有初始化,则跳出继续循环,直至初始化完成
continue; // spin // 走到这里,说明已经初始化完成,但是初始化时head = h;tail = h;head和tail都是相同的空节点
// 如果h == t为false,则判断t.isData == isData,判断队尾节点和当前节点类型是否一致
// 队列空,或队列中节点类型和当前节点一致,
// 即我们说的第一种情况,将节点入队即可。读者要想着这块 if 里面方法其实就是入队
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// t != tail 说明刚刚有节点入队,continue 即可
if (t != tail) // inconsistent read
continue;
// 有其他节点入队,但是 tail 还是指向原来的,此时设置 tail 即可
if (tn != null) { // lagging tail
// 这个方法就是:如果 tail 此时为 t 的话,设置为 tn
advanceTail(t, tn);
continue;
}
//
if (timed && nanos <= 0) // can't wait
return null;
// s == null,则创建一个新节点
if (s == null)
s = new QNode(e, isData);
// 将当前节点,插入到 tail 的后面
if (!t.casNext(null, s)) // failed to link in
continue; // 将当前节点设置为新的 tail
advanceTail(t, s); // swing tail and wait
// 看到这里,请读者先往下滑到这个方法,看完了以后再回来这里,思路也就不会断了
Object x = awaitFulfill(s, e, timed, nanos);
// 到这里,说明之前入队的线程被唤醒了,准备往下执行
// 若返回的x == s表示,当前线程已经超时或者中断,不然的话s == null或者是匹配的节点
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
// 若s节点被设置为取消
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? x : e; // 这里的 else 分支就是上面说的第二种情况,有相应的读或写相匹配的情况
} else { // complementary-mode
QNode m = h.next; // node to fulfill
// 不一致读,表明有其他线程修改了队列
if (t != tail || m == null || h != head)
continue; // inconsistent read Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
} advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? x : e;
}
}
} void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}

第一次offer(command)时,我们可以看到 transfer 方法中 第32行处 timed && nanos <= 0 成立,此时return null,则offer返回false,所以第一次添加任务时,就会执行最后的 else if (!addWorker(command, false)) 添加一个worker,如果这个worker执行完任务,在getTask()中从等待队列中取任务,这时如果有线程提交任务,则在 if (isRunning(c) && workQueue.offer(command)) 处给到空闲的线程;如果等待超过60秒,则关闭此线程;如果此时线程还在执行任务,还有线程提交任务,则还会执行到最后的 else if (!addWorker(command, false)) 添加一个worker。

SynchronousQueue 是一个比较特殊的 BlockingQueue,其本身不储存任何元素,它有一个虚拟队列(或虚拟栈),不管读操作还是写操作,如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也进入队列中等待;如果是相反模式,则配对成功,从当前队列中取队头节点。具体的信息,可以看我的另一篇关于 BlockingQueue 的文章。

第一次offer(command)时,如果此时没有相反操作的在getTask,这时添加队列并不会阻塞,直接返回false,然后创建一个worker,执行当前任务,当前worker在60秒内如果有其他线程offer,则会继续getTask执行任务,如果超时60秒,则会回收当前worker,如果并发很多同时提交任务,并且处理任务过慢,则会同时创建很多线程,因为没有空闲的线程等待getTask。如果在60秒内执行完任务,且又有任务来 ,则入队的线程直接将任务给空闲的线程

并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)的更多相关文章

  1. 并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)

    在上一篇<并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)>中提到了线程池ThreadPoolExecutor的原理以及它的execute方法.这篇文章是接着上一篇文章 ...

  2. 并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)

    史上最清晰的线程池源码分析 鼎鼎大名的线程池.不需要多说!!!!! 这篇博客深入分析 Java 中线程池的实现. 总览 下图是 java 线程池几个相关类的继承结构:    先简单说说这个继承结构,E ...

  3. 并发编程(十五)——定时器 ScheduledThreadPoolExecutor 实现原理与源码深度解析

    在上一篇线程池的文章<并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)>中从ThreadPoolExecutor源码分析了其运行机制.限于篇幅,留下了Scheduled ...

  4. Java并发指南12:深度解读 java 线程池设计思想及源码实现

    ​深度解读 java 线程池设计思想及源码实现 转自 https://javadoop.com/2017/09/05/java-thread-pool/hmsr=toutiao.io&utm_ ...

  5. 【转载】深度解读 java 线程池设计思想及源码实现

    总览 开篇来一些废话.下图是 java 线程池几个相关类的继承结构: 先简单说说这个继承结构,Executor 位于最顶层,也是最简单的,就一个 execute(Runnable runnable) ...

  6. 线程池 ThreadPoolExecutor 原理及源码笔记

    前言 前面在学习 JUC 源码时,很多代码举例中都使用了线程池 ThreadPoolExecutor,并且在工作中也经常用到线程池,所以现在就一步一步看看,线程池的源码,了解其背后的核心原理. 公众号 ...

  7. Java并发编程:Java线程池

    转载自:http://www.cnblogs.com/dolphin0520/p/3932921.html 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题 ...

  8. Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析

    目录 引出线程池 Executor框架 ThreadPoolExecutor详解 构造函数 重要的变量 线程池执行流程 任务队列workQueue 任务拒绝策略 线程池的关闭 ThreadPoolEx ...

  9. 并发编程(十四)—— ScheduledThreadPoolExecutor 实现原理与源码深度解析 之 DelayedWorkQueue

    我们知道线程池运行时,会不断从任务队列中获取任务,然后执行任务.如果我们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行. ...

随机推荐

  1. 【剑指offer】顺时针打印矩阵

    转载请注明出处:http://blog.csdn.net/ns_code/article/details/26053049 剑指offer上的第20题,九度OJ上測试通过. 题目描写叙述: 输入一个矩 ...

  2. Java开源项目(备查)

    转自:http://www.blogjava.net/Carter0618/archive/2008/08/11/221222.html Spring Framework  [Java开源 J2EE框 ...

  3. 让Cocos2D-X的示例程序运行起来

    没有整理好,现在先标记下 安装好环境后可能遇到的问题: 1.cocos2d-X 2.0版本后创建的Android项目提示org.cocos2dx.lib.Cocos2dxActivity找不到问题 解 ...

  4. PDFBox 介绍

    根据官网的介绍可知,PDFBox是一个用来处理PDF文档的开源的Java工具包.这个项目运行创建PDF文档.对已有文档进行操作并且能够从文档中提取内容.它也包含了几个命令行工具.还有一点很重要,它是开 ...

  5. Httphelper工具1

    [苏飞开发助手V1.0测试版]官方教程与升级报告导读部分------------------------------------------------------------------------ ...

  6. java 并发多线程 锁的分类概念介绍 多线程下篇(二)

    接下来对锁的概念再次进行深入的介绍 之前反复的提到锁,通常的理解就是,锁---互斥---同步---阻塞 其实这是常用的独占锁(排它锁)的概念,也是一种简单粗暴的解决方案 抗战电影中,经常出现为了阻止日 ...

  7. Error&colon;java&colon; 无效的目标发行版&colon; 1&period;8

    出现问题: Error:java: 无效的目标发行版: 1.8 解决方法: file-setting--

  8. React16&period;x特性剪辑

    本文整理了 React 16.x 出现的耳目一新的概念与 api 以及应用场景. 更多 React 系列文章可以订阅blog 16.0 Fiber 在 16 之前的版本的渲染过程可以想象成一次性潜水 ...

  9. jdbc连接sqlserver&comma;mysql&comma;oracle

    class xxx{ private static String port = "1433"; private static String ip = "192.168.2 ...

  10. 2101244 - FAQ&colon; SAP HANA Multitenant Database Containers &lpar;MDC&rpar;

    Symptom You face issues or have questions related to multitenant database containers in SAP HANA env ...