Java Executor并发框架(十三)Executor框架线程池关于异常的处理

时间:2022-12-02 18:36:02

一、介绍

关于为什么要写这篇文章,是因为我对Executor线程池的两种提交任务的方式的不同产生的好奇,我们知道,可以通过execute和submit两种方式往线程池提交我们的任务,但是这两种任务提交的方式到底有什么区别呢?通过execute方式提交的任务,我们不能获取任务执行后的返回值,而通过submit提交任务的方式,我们是可以获取任务执行结束后产生的结果。那么另一个区别就是关于异常的问题了,请看下文。

二、问题的研究

ok, 我们写一个类,继承ThreadPoolExecutor,命名为MyThreadPoolExecutor,并重写然后使用MyThreadPoolExecutor创建一个线程池。

MyThreadPoolExecutor代码:

package com.npf;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
System.out.println("MyThreadPoolExecutor.afterExecute()");
}

}

一看afterExecute这个方法,想当然的以为如果线程池中出了问题,异常自然回在第二个参数 t 中传过来。也许的确是这样的,但是这里有一个区别。我们知道ExecutorServcie中执行一个Runnable有三个方法,分别是:
第一种,Executor接口中定义的execute方法:
Java Executor并发框架(十三)Executor框架线程池关于异常的处理

第二种,AbstractExecutorService实现的submit(Runnable task)方法。
Java Executor并发框架(十三)Executor框架线程池关于异常的处理

第三种,AbstractExecutorService实现的submit(Runnable task,T result)方法。
Java Executor并发框架(十三)Executor框架线程池关于异常的处理

后面两种提交runnable任务的方式,内部都调用了newTaskFor(Callable<T> callable)方法:
Java Executor并发框架(十三)Executor框架线程池关于异常的处理

通过上面,我们知道,后面两种提交runnable任务的方式,内部都是将runnable任务封装成了FutureTask,然后再执行execute方法,我们知道execute方法是定义在Executor接口中的方法,它接收的参数是Runnable类型,为什么我们的FutureTask也符合execute方法的参数定义呢?
请看FutureTask的定义:
Java Executor并发框架(十三)Executor框架线程池关于异常的处理


FutureTask实现了RunnableFuture,而RunnableFuture继承了Runnable, Future<V>接口。
Java Executor并发框架(十三)Executor框架线程池关于异常的处理

综上我们知道,到最后都是会调用execute方法。execute方法对进来的Runnable又包装成了worker然后进入runWorker
runWorker
方法中有这么几行:
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}

好了,到了最关键的afterExecute这个步骤,我满心以为这里所有的异常都会通过thrown传递进来,看来我还是太年轻了,之前我们分析过,这个Runnable已经被submit封装成了FutureTask,那么这个task.run()除了我们自己定义的run任务之外,到底还干了啥呢?
因为如果我们通过submit的方式提交Runnable的对象被封装成了FutureTask, 那么这里的task.run()这句代码的task对象实际上就是FutureTask对象,那么这个run也就是FutureTask的方法,下面我们贴出FutureTask的run方法的源码:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
实际我们自己定义的任务已经变成了Callable对象,那么这里的Callable对象的实现类到底是谁呢?
也就是Callable<V> c = callable; 这个callable的具体类型到底是什么?请看下面的分析。

我们知道,通过submit的方式提交Runnable的对象被封装成了FutureTask的关键地方就是:
Java Executor并发框架(十三)Executor框架线程池关于异常的处理

那么我们进入到FutureTask的这个构造方法里面去看:
Java Executor并发框架(十三)Executor框架线程池关于异常的处理

再进入到Executors.callable(runnable, result)方法里面去看:
Java Executor并发框架(十三)Executor框架线程池关于异常的处理

再进入到RunnableAdapter里面去看,很简单的发现,这就是一个典型的适配器模式:
Java Executor并发框架(十三)Executor框架线程池关于异常的处理
到这里已经很清楚了,这个callable对象就是RunnableAdapter。

也就是说,如果我们使用submit的方式提交Runnable的对象,那么这个callable对象就是RunnableAdapter,然后程序下面的result = c.call(); 的时候,实际就是执行RunnableAdapter的call()方法, 在这个call方面里面,实际调用了我们提交的Runnable对象的run方法。我们写的任务全部在这句代码里面执行完毕了,看看外面都wrap了啥? OK,我们所有的Throwable全部已经被setException吃掉了,怎么还会抛出到外面那层的execute中呢?所以在submit中提交任务无论任务怎么抛异常,在afterExecute中的第二个参数是取不到的,原因就在这。

如果我们需要获取这个异常怎么办呢?通过FutureTask的get方法,能把刚刚setException中的异常给抛出来,这样我们就能真的拿到这些异常了。
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Future<?> f = (Future<?>) r;
try {
f.get();
} catch (InterruptedException e) {
logger.error("线程池中发现异常,被中断", e);
} catch (ExecutionException e) {
logger.error("线程池中发现异常,被中断", e);
}

}

如果我们直接通过execute方法提交任务,那么上面刚才我们分析的代码片段中:
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
task.run()中,这个task就是实际的Runnable对象,那么如果这里发生异常的话,afterExecute这个方法的第二个参数thrown就会有异常信息了。

三、结论

如果我们关心线程池执行的结果,则需要使用submit来提交task,那么在afterExecute中对异常的处理也需要通过Future接口调用get方法去取结果,才能拿到异常,如果我们不关心这个任务的结果,可以直接使用ExecutorService中的execute方法(实际是继承Executor接口)来直接去执行任务,这样的话,我们的Runnable没有经过多余的封装,在runWorker中得到的异常也直接能在afterExecute中捕捉。




参考文献:
1. Java ExecutorService线程池中的小坑——关于线程池中抛出的异常处理