Java线程池 ThreadPoolExecutor 深入解析 任务列队,拒绝策略,自定义线程池工厂,线程池扩展,线程核心最大列队数值大小定

时间:2022-10-11 14:55:14


目录

​相关文章​

​介绍     ​

​ThreadPoolExecutor 构造方法​

​构造函数的参数含义如下​

​各项介绍​

​workQueue任务队列​

​1.直接提交队列设置为SynchronousQueue队列​

​2.有界的任务队列ArrayBlockingQueue实现​

​3.*的任务队列 使用LinkedBlockingQueue实现​

​4.优先任务队列:优先任务队列通过PriorityBlockingQueue实现​

​拒绝策略​

​自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,​

​ThreadFactory自定义线程创建​

​ThreadPoolExecutor扩展​

​线程池线程数量​

​任务类型 cpu 密集型 io密集型   ​

​CPU 密集型任务​

​IO 密集型任务​

​相关文章​


相关文章

上篇:java 线程锁 可重入锁 可中断锁 公平锁 非公平锁ReentrantLock synchronized,条件Condition,读写锁 ReentrantReadWriteLock写写互斥读读共享

关联上篇:java 多线程 实现 无返回值 有返回值 Runnable Thread Callable<T> Future<String> FutureTask<String> 线程


介绍     

jdk 中提供了 Executor 可以设置几种线程池:newFixedThreadPool()newSingleThreadExecutor()、newCachedThreadPool()等 比较比较局限了,所以这里介绍下 ThreadPoolExecutor

ThreadPoolExecutor 构造方法


public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造函数的参数含义如下

  • corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
  • maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
  • keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
  • unit:keepAliveTime的单位
  • workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、*任务队列、优先任务队列几种;
  • threadFactory:线程工厂,用于创建线程,一般用默认即可;
  • handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;

各项介绍

workQueue任务队列

1.直接提交队列设置为SynchronousQueue队列

它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。

private static ExecutorService pool;
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

2.有界的任务队列ArrayBlockingQueue实现

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

有任务执行时候,线程池创建新线程,知道核心线程池大小,剩下的加入到等待列队,等待列队满了,开始增加线程,直到最大线程池数

3.*的任务队列 使用LinkedBlockingQueue实现

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

当中最大线程数是无效的直到资源耗尽

4.优先任务队列:优先任务队列通过PriorityBlockingQueue实现

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

可以自定义优先级

public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//优先任务队列
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

for(int i=0;i<20;i++) {
pool.execute(new ThreadTask(i));
}
}
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{

private int priority;

public int getPriority() {
return priority;
}

public void setPriority(int priority) {
this.priority = priority;
}

public ThreadTask() {

}

public ThreadTask(int priority) {
this.priority = priority;
}

//当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
public int compareTo(ThreadTask o) {
return this.priority>o.priority?-1:1;
}

public void run() {
try {
//让线程阻塞,使后续任务进入缓存队列
Thread.sleep(1000);
System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

日志

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

拒绝策略

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:

  • 1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;
  • 2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
  • 3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;
  • 4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;

自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,


public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//自定义拒绝策略
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+"执行了拒绝策略");

}
});

for(int i=0;i<10;i++) {
pool.execute(new ThreadTask());
}
}
}

public class ThreadTask implements Runnable{
public void run() {
try {
//让线程阻塞,使后续任务进入缓存队列
Thread.sleep(1000);
System.out.println("ThreadName:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}
  • 日志
supply.chain.bp.upload.service.sliceupload.init.ThreadTask@64616ca2执行了拒绝策略
supply.chain.bp.upload.service.sliceupload.init.ThreadTask@13fee20c执行了拒绝策略
supply.chain.bp.upload.service.sliceupload.init.ThreadTask@4e04a765执行了拒绝策略
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2

上边 设置的时间延迟 所以多了后执行了拒绝策略 ,我们自己定义的拒绝策略

ThreadFactory自定义线程创建

线程产生是通过线程工厂实现的,我们这里自定义线程工厂ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,


public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//自定义线程工厂
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("线程"+r.hashCode()+"创建");
//线程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());

for(int i=0;i<10;i++) {
pool.execute(new ThreadTask());
}
}
}

public class ThreadTask implements Runnable{
public void run() {
//输出执行线程的名称
System.out.println("ThreadName:"+Thread.currentThread().getName());
}
}

日志

线程234698513创建
线程1595953398创建
ThreadName:threadPool234698513
线程998351292创建
ThreadName:threadPool234698513
ThreadName:threadPool1595953398
ThreadName:threadPool234698513
ThreadName:threadPool1595953398
ThreadName:threadPool998351292
ThreadName:threadPool234698513
ThreadName:threadPool998351292
ThreadName:threadPool1595953398
ThreadName:threadPool234698513

线程池中,每个线程的创建我们都进行了记录输出与命名。
 

ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,

  • 1、beforeExecute:线程池中任务运行前执行
  • 2、afterExecute:线程池中任务运行完毕后执行
  • 3、terminated:线程池退出后执行

通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。



public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args ) throws InterruptedException
{
//实现自定义接口
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("线程"+r.hashCode()+"创建");
//线程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy()) {

protected void beforeExecute(Thread t,Runnable r) {
System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
}

protected void afterExecute(Runnable r,Throwable t) {
System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
}

protected void terminated() {
System.out.println("线程池退出");
}
};

for(int i=0;i<10;i++) {
pool.execute(new ThreadTask("Task"+i));
}
pool.shutdown();
}
}

public class ThreadTask implements Runnable{
private String taskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public ThreadTask(String name) {
this.setTaskName(name);
}
public void run() {
//输出执行线程的名称
System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
}
}

日志


线程1121172875创建
线程998351292创建
线程1684106402创建
准备执行:Task1
TaskNameTask1---ThreadName:threadPool998351292
执行完毕:Task1
准备执行:Task0
线程335471116创建
TaskNameTask0---ThreadName:threadPool1121172875
准备执行:Task2
执行完毕:Task0
TaskNameTask2---ThreadName:threadPool998351292
准备执行:Task3
TaskNameTask3---ThreadName:threadPool1121172875
执行完毕:Task2
执行完毕:Task3
准备执行:Task7
准备执行:Task5
TaskNameTask5---ThreadName:threadPool1121172875
执行完毕:Task5
准备执行:Task4
TaskNameTask4---ThreadName:threadPool998351292
执行完毕:Task4
准备执行:Task6
TaskNameTask6---ThreadName:threadPool1121172875
准备执行:Task8
TaskNameTask8---ThreadName:threadPool335471116
TaskNameTask7---ThreadName:threadPool1684106402
执行完毕:Task8
执行完毕:Task6
准备执行:Task9
TaskNameTask9---ThreadName:threadPool998351292
执行完毕:Task9
执行完毕:Task7
线程池退出

进程已结束,退出代码为 0

线程池线程数量

任务类型 cpu 密集型 io密集型   

      
    CPU 密集型任务:加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,大部分场景下都是纯 CPU 计算。     
    IO 密集型任务:比如像 MySQL 数据库、文件的读写、网络通信等任务,这类任务不会特别消耗 CPU 资源,但是 IO 操作比较耗时,会占用比较多时间。     
 

CPU 密集型任务

对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 8 核的 CPU,每个核一个线程,理论上创建 8 个线程就可以了。

如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,

而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降。

因此,对于 CPU 密集型的计算场景,理论上线程的数量 = CPU 核数就是最合适的,不过通常把线程的数量设置为CPU 核数 +1,会实现最优的利用率。

即使当密集型的线程由于偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费,从而保证 CPU 的利用率。         
 

IO 密集型任务

对于 IO 密集型任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在任务队列中等待的任务就会减少,可以更好地利用资源。         
计算方法:         
      

  线程数 = CPU 核心数 * (1 + IO 耗时/ CPU 耗时) 

         
通过这个公式,我们可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。

可以采用 APM 工具统计到每个方法的耗时,便于计算 IO 耗时和 CPU 耗时。         
         
         

相关文章

上篇:java 线程锁 可重入锁 可中断锁 公平锁 非公平锁ReentrantLock synchronized,条件Condition,读写锁 ReentrantReadWriteLock写写互斥读读共享

关联上篇:java 多线程 实现 无返回值 有返回值 Runnable Thread Callable<T> Future<String> FutureTask<String> 线程


ok

持续更新