Java 线程池 Executor框架(1)ThreadPoolExecutor

时间:2022-10-29 18:33:36

Executor框架的结构

Executor框架主要包含三个部分:
任务:包括Runnable和Callable,其中Runnable表示一个可以异步执行的任务,而Callable表示一个会产生结果的任务
任务的执行:包括Executor框架的核心接口Executor以及其子接口ExecutorService。在Executor框架中有两个关键类ThreadPoolExecutor和ScheduledThreadPoolExecutor实现了ExecutorService接口。
异步计算的结果:包括接口Future和其实现类FutureTask。
Java 线程池 Executor框架(1)ThreadPoolExecutor
我们会一步一步详细解释每部分的作用与原理


(1)ThreadPoolExecutor

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组件构成。
 corePool:核心线程池的大小。
 maximumPool:最大线程池的大小。
 BlockingQueue:用来暂时保存任务的工作队列。
 RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler。

通过Executor框架的工具类Executors,可以创建3种类型的ThreadPoolExecutor。
 FixedThreadPool
 SingleThreadExecutor
 CachedThreadPool


【1】FixedThreadPool
FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实现。

public static ExecutorService newFixedThreadPool(int nThreads{
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

 FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时定的参数nThreads。
 当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的 最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止
Java 线程池 Executor框架(1)ThreadPoolExecutor
1)如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
2)在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入 LinkedBlockingQueue。
3)线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。FixedThreadPool使用*队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。

FixedThreadPool是使用固定线程数的线程池,Executors提供的API有如下两个

1 public static ExecutorService newFixedThreadPool(int nThreads);
2 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory);

关于FixedThreadExecutor的demo

ExecutorService service = Executor.newFixedThreadExecutor(2);

service.submit(new Runnable(){
@Override
public void run(){
for(int i = 0; i<10; i++){
try{
Thread.sleep(1000);
System.out.println("Executor1:"+i);
}catch(Exception e){
e.printStackTrace();
}
}
}
});

service.submit(new Runnable(){
@Override
public void run(){
for(int i = 0; i<10; i++){
try{
Thread.sleep(1000);
System.out.println("Executor2:"+i);
}catch(Exception e){
e.printStackTrace();
}
}
}
})

输出结果:Executor1和Executor2并行打印出来
Java 线程池 Executor框架(1)ThreadPoolExecutor


【2】SingleThreadExecutor
正如其名,SingleThreadExecutor是单用单个worker线程的Executor。他和FixedThreadExcutor的设置差不多,只不过SingleThreadExecutor的corePoolSize是1而已。

public static ExecutorService newSingleThreadPool(int nThreads{
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecuto(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

Java 线程池 Executor框架(1)ThreadPoolExecutor
线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。
SingleThreadExecutor使用单线程执行任务,Executors提供的API有如下两个

1.public static ExecutorService newSingleThreadExecutor();
2.public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory);

关于SingleThreadExecutor的demo

ExecutorService service = Executor.newSingleThreadExecutor();
service.submit(new Runnable(){
@Override
public void run(){
for(int i = 0;i<100;i++){
sleep(1000);
System.out.println("Execute"+i);
}
}
});

【3】CachedThreadExecutor
CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThreadPool的源代码。

public static ExecutorService newCachedThreadPool() { 
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
60L,TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

 CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是*的。这里把keepAliveTime设置为60L,意味着 CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
 FixedThreadPool和SingleThreadExecutor使用*队列LinkedBlockingQueue作为线程池的 工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是*的。这意味着,如果主线程提交任务的速度高于 maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源.
Java 线程池 Executor框架(1)ThreadPoolExecutor
1)首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程 正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方 法执行完成;否则执行下面的步骤2)。

2)当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1)将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。

3)在步骤2)中新创建的线程将任务执行完后,会执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执 行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于 空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一 个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的 任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图如下图所示。
Java 线程池 Executor框架(1)ThreadPoolExecutor
CachedThreadPool是*线程池,Executors提供的API有如下两个

1.public static ExecutorService newCachedThreadPool();
2.public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory);

CachedThreadPool适用于执行很多短期异步任务的小程序,适用于负载较轻的服务器。