ForkJoinPool源码简单解析

时间:2022-12-07 22:46:47

ForkJoin框架之ForkJoinTask

 阅读约 62 分钟

前言

在前面的文章"CompletableFuture和响应式编程"中提到了ForkJoinTask和ForkJoinPool,后者毫无疑问是一个线程池,前者则是一个类似FutureTask经典定义的概念.

官方有一个非常无语的解释:ForkJoinTask就是运行在ForkJoinPool的一个任务抽象,ForkJoinPool就是运行ForkJoinTask的线程池.

ForkJoin框架包含ForkJoinTask,ForkJoinWorkerThread,ForkJoinPool和若干ForkJoinTask的子类,它的核心在于分治和工作窍取,最大程度利用线程池中的工作线程,避免忙的忙死,饿的饿死.

ForkJoinTask可以理解为类线程但比线程轻量的实体,在ForkJoinPool中运行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任务.ForkJoinTask同时也是一个轻量的Future,使用时应避免较长阻塞和io.

ForkJoinTask在JAVA8中应用广泛,但它是一个抽象类,它的子类派生了各种用途,如后续计划单独介绍的CountedCompleter,以及若干JAVA8中stream api定义的与并行流有关的各种操作(ops).

源码

首先看ForkJoinTask的签名.

public abstract class ForkJoinTask<V> implements Future<V>, Serializable

从签名上看,ForkJoinTask实现了future,也可以序列化,但它不是一个Runnable或Callable.

ForkJoinTask虽然可以序列化,但它只对运行前和后敏感,对于执行过程中不敏感.

先来看task的运行字段:

//volatie修饰的任务状态值,由ForkJoinPool或工作线程修改.
volatile int status;
static final int DONE_MASK = 0xf0000000;//用于屏蔽完成状态位.
static final int NORMAL = 0xf0000000;//表示正常完成,是负值.
static final int CANCELLED = 0xc0000000;//表示被取消,负值,且小于NORMAL
static final int EXCEPTIONAL = 0x80000000;//异常完成,负值,且小于CANCELLED
static final int SIGNAL = 0x00010000;//用于signal,必须不小于1<<16,默认为1<<16.
static final int SMASK = 0x0000ffff;//后十六位的task标签

很显然,DONE_MASK能够过滤掉所有非NORMAL,非CANCELLED,非EXCEPTIONAL的状态,字段的含义也很直白,后面的SIGNAL和SMASK还不明确,后面再看.

//标记当前task的completion状态,同时根据情况唤醒等待该task的线程.
private int setCompletion(int completion) {
for (int s;;) {
//开启一个循环,如果当前task的status已经是各种完成(小于0),则直接返回status,这个status可能是某一次循环前被其他线程完成.
if ((s = status) < 0)
return s;
//尝试将原来的status设置为它与completion按位或的结果.
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s >>> 16) != 0)
//此处体现了SIGNAL的标记作用,很明显,只要task完成(包含取消或异常),或completion传入的值不小于1<<16,
//就可以起到唤醒其他线程的作用.
synchronized (this) { notifyAll(); }
//cas成功,返回参数中的completion.
return completion;
}
}
}

前面用注释解释了这个方法的逻辑,显然该方法是阻塞的,如果传入的参数不能将status设置为负值会如何?

显然,可能会有至多一次的成功cas,并且若满足唤醒的条件,会尝试去唤醒线程,甚至可能因为为了唤醒其他线程而被阻塞在synchonized代码块外;也可能没有一次成功的cas,直到其他线程成功将status置为完成.

//final修饰,运行ForkJoinTask的核心方法.
final int doExec() {
int s; boolean completed;
//仅未完成的任务会运行,其他情况会忽略.
if ((s = status) >= 0) {
try {
//调用exec
completed = exec();
} catch (Throwable rex) {
//发生异常,用setExceptionalCompletion设置结果
return setExceptionalCompletion(rex);
}
if (completed)
//正常完成,调用前面说过的setCompletion,参数为normal,并将返回值作为结果s.
s = setCompletion(NORMAL);
}
//返回s
return s;
} //记录异常并且在符合条件时传播异常行为
private int setExceptionalCompletion(Throwable ex) {
//首先记录异常信息到结果
int s = recordExceptionalCompletion(ex);
if ((s & DONE_MASK) == EXCEPTIONAL)
//status去除非完成态标志位(只保留前4位),等于EXCEPTIONAL.内部传播异常
internalPropagateException(ex);
return s;
}
//internalPropagateException方法是一个空方法,留给子类实现,可用于completer之间的异常传递
void internalPropagateException(Throwable ex) {}
//记录异常完成
final int recordExceptionalCompletion(Throwable ex) {
int s;
if ((s = status) >= 0) {
//只能是异常态的status可以记录.
//hash值禁止重写,不使用子类的hashcode函数.
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
//异常锁,加锁
lock.lock();
try {
//抹除脏异常,后面叙述
expungeStaleExceptions();
//异常表数组.ExceptionNode后面叙述.
ExceptionNode[] t = exceptionTable;//exceptionTable是一个全局的静态常量,后面叙述
//用hash值和数组长度进行与运算求一个初始的索引
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
//找到空的索引位,就创建一个新的ExceptionNode,保存this,异常对象并退出循环
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);//(1)
break;
}
if (e.get() == this) //已设置在相同的索引位置的链表中,退出循环.//2
break;
//否则e指向t[i]的next,进入下个循环,直到发现判断包装this这个ForkJoinTask的ExceptionNode已经出现在t[i]这个链表并break(2),
//或者直到e是null,意味着t[i]出发开始的链表并无包装this的ExceptionNode,则将构建一个新的ExceptionNode并置换t[i],
//将原t[i]置为它的next(1).整个遍历判断和置换过程处在锁中进行.
}
} finally {
lock.unlock();
}
//记录成功,将当前task设置为异常完成.
s = setCompletion(EXCEPTIONAL);
}
return s;
} //exceptionTable声明
private static final ExceptionNode[] exceptionTable;//全局异常node表
private static final ReentrantLock exceptionTableLock;//上面用到的锁,就是一个普通的可重入锁.
private static final ReferenceQueue<Object> exceptionTableRefQueue;//变量表引用队列,后面详述.
private static final int EXCEPTION_MAP_CAPACITY = 32;//异常表的固定容量,不大,只有32而且是全局的. //初始化在一个静态代码块.
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];//容量
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinTask.class;
STATUS = U.objectFieldOffset
(k.getDeclaredField("status"));
} catch (Exception e) {
throw new Error(e);
}
} //先来看ExceptionNode内部类的实现
//签名,实现了一个ForkJoinTask的弱引用.
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
final Throwable ex;
ExceptionNode next;
final long thrower; // use id not ref to avoid weak cycles
final int hashCode; // store task hashCode before weak ref disappears
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
super(task, exceptionTableRefQueue);//指向弱引用的构造函数,保存引用为task,队列为全局的exceptionTableRefQueue.
this.ex = ex;//抛出的异常的引用
this.next = next;//数组中的ExceptionNode以链表形式存在,前面分析过,先入者为后入者的next
this.thrower = Thread.currentThread().getId();//保存抛出异常的线程id(严格来说是创建了this的线程)
this.hashCode = System.identityHashCode(task);//哈希码保存关联task的哈希值.
}
}
//清除掉异常表中的脏数据,仅在持有全局锁时才可使用.前面看到在记录新的异常信息时要进行一次清除尝试
private static void expungeStaleExceptions() {
//循环条件,全局exceptionTableRefQueue队列不为空,前面说过ExceptionNode是弱引用,当它被回收时会被放入此队列.
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
//从队首依次取出元素.
if (x instanceof ExceptionNode) {
//计算在全局exceptionTable中的索引.
int hashCode = ((ExceptionNode)x).hashCode;
ExceptionNode[] t = exceptionTable;
int i = hashCode & (t.length - 1);
//取出node
ExceptionNode e = t[i];
ExceptionNode pred = null;
//不停遍历,直到e是null为止.
while (e != null) {
//e的next
ExceptionNode next = e.next;//2
//x是队首出队的元素.它与e相等说明找到
if (e == x) {
//e是一个链表的元素,pred表示它是否有前置元素
if (pred == null)
//无前置元素,说明e在链表首部,直接将首部元素指向next即可.
t[i] = next;
else
//有前置元素,说明循环过若干次,将当前e出链表
pred.next = next;
//在链表中发现x即break掉内循环,继续从exceptionTableRefQueue的队首弹出新的元素.
break;
}
//只要发现当前e不是x,准备下一次循环,pred指向e.e指向next,进行下一个元素的比较.
pred = e;
e = next;
}
}
}
}

到此doExec(也是每个ForkJoinTask的执行核心过程)就此结束.

很明显,ForkJoinTask的doExec负责了核心的执行,它留下了exec方法给子类实现,而重点负责了后面出现异常情况的处理.处理的逻辑前面已论述,在产生异常时尝试将异常存放在全局的execptionTable中,存放的结构为数组+链表,按哈希值指定索引,每次存放新的异常时,顺便清理上一次已被gc回收的ExceptionNode.所有ForkJoinTask共享了一个exceptionTable,因此必然在有关的几个环节要进行及时的清理.除了刚刚论述的过程,还有如下的几处:

ForkJoinPool源码简单解析

前面论述了recordExceptionalCompletion,一共有四处使用了expungeStaleException,将已回收的ExceptionNode从引用队列中清除.

clearExceptionalCompletion在对一个ForkJoinTask重新初始化时使用,我们在前面提到序列化时说过,ForkJoinTask的序列化结果只保留了两种情况:运行前,运行结束.重新初始化一个ForkJoinTask,就要去除任何中间状态,包含自身产出的已被回收的异常node,而expungeStaleExceptions显然也顺便帮助其他task清除.

getThrowableException是查询task运行结果时调用,如一些get/join方法,很明显,记录这个异常的作用就在于返回给get/join,在这一块顺便清理已被回收的node,尤其是将自己运行时生成的node清除.

helpExpungeStaleExceptions是提供给ForkJoinPool在卸载worker时使用,顺便帮助清理全局异常表.

使用它们的方法稍后再论述,先来继续看ForkJoinTask的源码.

//内部等待任务完成,直到完成或超时.
final void internalWait(long timeout) {
int s;
//status小于0代表已完成,直接忽略wait.
//未完成,则试着加上SIGNAL的标记,令完成任务的线程唤醒这个等待.
if ((s = status) >= 0 &&
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
//加锁,只有一个线程可以进入.
synchronized (this) {
//再次判断未完成.等待timeout,且忽略扰动异常.
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
else
//已完成则响醒其他等待者.
notifyAll();
}
}
}

internalWait方法逻辑很简单,首先判断是否未完成,满足未完成,则将标记位加上SIGNAL(可能已有别的线程做过),随后加锁double check status,还未完成则等待并释放锁,若发现已完成,或在后续被唤醒后发现已完成,则唤醒其他等待线程.通过notifyAll的方式避免了通知丢失.

同时,它的使用方法目前只有一个ForkJoinPool::awaitJoin,在该方法中使用循环的方式进行internalWait,满足了每次按截止时间或周期进行等待,同时也顺便解决了虚假唤醒.

继续看externalAwaitDone函数.它体现了ForkJoin框架的一个核心:外部帮助.

//外部线程等待一个common池中的任务完成.
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ?
//当前task是一个CountedCompleter,尝试使用common ForkJoinPool去外部帮助完成,并将完成状态返回.
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
//当前task不是CountedCompleter,则调用common pool尝试外部弹出该任务并进行执行,
//status赋值doExec函数的结果,若弹出失败(其他线程先行弹出)赋0.
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
//检查上一步的结果,即外部使用common池弹出并执行的结果(不是CountedCompleter的情况),或外部尝试帮助CountedCompleter完成的结果
//status大于0表示尝试帮助完成失败.
//扰动标识,初值false
boolean interrupted = false;
do {
//循环尝试,先给status标记SIGNAL标识,便于后续唤醒操作.
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
//CAS成功,进同步块发现double check未完成,则等待.
wait(0L);
} catch (InterruptedException ie) {
//若在等待过程中发生了扰动,不停止等待,标记扰动.
interrupted = true;
}
}
else
//进同步块发现已完成,则唤醒所有等待线程.
notifyAll();
}
}
} while ((s = status) >= 0);//循环条件,task未完成.
if (interrupted)
//循环结束,若循环中间曾有扰动,则中断当前线程.
Thread.currentThread().interrupt();
}
//返回status
return s;
}

externalAwaitDone的逻辑不复杂,在当前task为ForkJoinPool.common的情况下可以在外部进行等待和尝试帮助完成.方法会首先根据ForkJoinTask的类型进行尝试帮助,并返回当前的status,若发现未完成,则进入下面的等待唤醒逻辑.该方法的调用者为非worker线程.

相似的方法:externalInterruptibleAwaitDone

private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
//不同于externalAwaitDone,入口处发现当前线程已中断,则立即抛出中断异常.
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0 &&
(s = ((this instanceof CountedCompleter) ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
0)) >= 0) {
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
//wait时也不catch中断异常,发生即抛出.
wait(0L);
else
notifyAll();
}
}
}
}
return s;
}

externalInterruptibleAwaitDone的逻辑与externalAwaitDone相似,只是对中断异常的态度为抛,后者为catch.

它们的使用点,externalAwaitDone为doJoin或doInvoke方法调用,externalInterruptibleAwaitDone为get方法调用,很明显,join操作不可扰动,get则可以扰动.

下面来看看doJoin和doInvoke

//join的核心方法
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//已完成,返回status,未完成再尝试后续
return (s = status) < 0 ? s :
//未完成,当前线程是ForkJoinWorkerThread,从该线程中取出workQueue,并尝试将
//当前task出队然后执行,执行的结果是完成则返回状态,否则使用当线程池所在的ForkJoinPool的awaitJoin方法等待.
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
//当前线程不是ForkJoinWorkerThread,调用前面说的externalAwaitDone方法.
externalAwaitDone();
} //invoke的核心方法
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
//先尝试本线程执行,不成功才走后续流程
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
//与上一个方法基本相同,但在当前线程是ForkJoinWorkerThread时不尝试将该task移除栈并执行,而是等
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}

到此终于可以看一些公有对外方法了.有了前面的基础,再看get,join,invoke等方法非常简单.

//get方法还有get(long time)的变种.
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
//当前线程是ForkJoinWorkerThread则调用前面提过的doJoin方法.
//否则调用前述externalInterruptibleAwaitDone
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
//异常处理,取消的任务,抛出CancellationException.
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
//异常处理,调用getThrowableException获取异常,封进ExecutionException.
throw new ExecutionException(ex);
//无异常处理,返回原始结果.
return getRawResult();
}
//getRawResult默认为一个抽象实现,在ForkJoinTask中,并未保存该结果的字段.
public abstract V getRawResult(); //getThrowableException方法
private Throwable getThrowableException() {
//不是异常标识,直接返回null,从方法名的字面意思看,要返回一个可抛出的异常.
if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
//系统哈希码来定位ExceptionNode
int h = System.identityHashCode(this);
ExceptionNode e;
final ReentrantLock lock = exceptionTableLock;
//加异常表全局锁
lock.lock();
try {
//先清理已被回收的异常node,前面已述.
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
e = t[h & (t.length - 1)];
//循环找出this匹配的异常node
while (e != null && e.get() != this)
e = e.next;
} finally {
lock.unlock();
}
Throwable ex;
//前面找不出异常node或异常node中存放的异常为null,则返回null
if (e == null || (ex = e.ex) == null)
return null;
if (e.thrower != Thread.currentThread().getId()) {
//不是当前线程抛出的异常.
Class<? extends Throwable> ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;//该异常的无参构造器
Constructor<?>[] cs = ec.getConstructors();//该异常类公有构造器
for (int i = 0; i < cs.length; ++i) {
Constructor<?> c = cs[i];
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
//构建器参数列表长度0说明存在无参构造器,存放.
noArgCtor = c;
else if (ps.length == 1 && ps[0] == Throwable.class) {
//发现有参构造器且参数长度1且第一个参数类型是Throwable,说明可以存放cause.
//反射将前面取出的ex作为参数,反射调用该构造器创建一个要抛出的Throwable.
Throwable wx = (Throwable)c.newInstance(ex);
//反射失败,异常会被catch,返回ex,否则返回wx.
return (wx == null) ? ex : wx;
}
}
if (noArgCtor != null) {
//在尝试了寻找有参无参构造器,并发现只存在无参构造器的情况,用无参构造器初始化异常.
Throwable wx = (Throwable)(noArgCtor.newInstance());
if (wx != null) {
//将ex设置为它的cause并返回它的实例.
wx.initCause(ex);
return wx;
}
}
} catch (Exception ignore) {
//此方法不可抛出异常,一定要成功返回.
}
}
//有参无参均未成功,返回找到的异常.
return ex;
} //join公有方法
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
//调用doJoin方法阻塞等待的结果不是NORMAL,说明有异常或取消.报告异常.
reportException(s);
//等于NORMAL,正常执行完毕,返回原始结果.
return getRawResult();
}
//报告异常,可在前一步判断执行status是否为异常态,然后获取并重抛异常.
private void reportException(int s) {
//参数s必须用DONE_MASK处理掉前4位以后的位.
if (s == CANCELLED)
//传入的状态码等于取消,抛出取消异常.
throw new CancellationException();
if (s == EXCEPTIONAL)
//使用前面的getThrowableException方法获取异常并重新抛出.
rethrow(getThrowableException());
} //invoke公有方法.
public final V invoke() {
int s;
//先尝试执行
if ((s = doInvoke() & DONE_MASK) != NORMAL)
//doInvoke方法的结果status只保留完成态位表示非NORMAL,则报告异常.
reportException(s);
//正常完成,返回原始结果.
return getRawResult();
}

终于,读到此处的读者将关键的方法线串了起来,前述的所有内部方法,常量和变量与公有接口的关系已经明了.

很显然,ForkJoinTask是个抽象类,且它并未保存任务的完成结果,也不负责这个结果的处理,但声明并约束了返回结果的抽象方法getRawResult供子类实现.

因此,ForkJoinTask的自身关注任务的完成/异常/未完成,子类关注这个结果的处理.

每当获取到任务的执行状态时,ForkJoinTask可根据status来判断是否是异常/正常完成,并进入相应的处理逻辑,最终使用子类实现的方法完成一个闭环.

如果理解为将ForkJoinTask和子类的有关代码合并起来,在结果/完成状态/异常信息这一块,相当于同时有三个part在合作.

第一个part:status字段,它同时表示了未完成/正常完成/取消/异常完成等状态,也同时告诉有关等待线程是否要唤醒其他线程(每个线程等待前会设置SIGNAL),同时留出了后面16位对付其他情况.

第二个part:result,在ForkJoinTask见不到它,也没有相应的字段,子类也未必需要提供这个result字段,前面提到的CountedCompleter就没有提供这个result,它的getRawResult会固定返回null.但是CountedCompleter可以继承子类并实现这个result的保存与返回(道格大神在注释中举出了若干典型代码例子),在JAVA8中,stream api中的并行流也会保存每一步的计算结果,并对结果进行合并.

第三个part:异常.在ForkJoinTask中已经完成了所有异常处理流程和执行流程的定义,重点在于异常的存放,它是由ForkJoinTask的类变量进行存放的,结构为数组+链表,且元素利用了弱引用,借gc帮助清除掉已经被回收的ExceptionNode,显然在gc之前必须得到使用.而异常随时可以发生并进行record入列,但相应的能消费掉这个异常的只有相应的外部的get,join,invoke等方法或者内部扩展了exec()等方式,得到其他线程执行的task异常结果的情况.巧妙的是,只有外部调用者调用(get,invoke,join)时,这个异常信息才足够重要,需要rethrow出去并保存关键的堆栈信息;而内部线程在访问一些非自身执行的任务时,往往只需要status判断是否异常即可,在exec()中fork新任务的,也往往必须立即join这些新的子任务,这就保证了能够及时得到子任务中的异常堆栈(即使拿不到堆栈也知道它失败了).

经过前面的论述,ForkJoinTask的执行和异常处理已经基本论结,但是,一个ForkJoinTask在创建之后是如何运行的?显然,它不是一个Runnable,也不是Callable,不能直接submit或execute到普通的线程池.

临时切换到ForkJoinPool的代码,前面提到过,ForkJoinTask的官方定义就是可以运行在ForkJoinPool中的task.

//ForkJoinPool代码,submit一个ForkJoinTask到ForkJoinPool,并将该task自身返回.
//拿到返回的task,我们就可以进行前述的get方法了.
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
//execute,不返回.类似普通线程池提交一个runnable的行为.
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}

显然,若要使用一个自建的ForkJoinPool,可以使用execute或submit函数提交入池,然后用前述的get方法和变种方法进行.这是一种运行task的方式.

前面论述过的invoke方法会先去先去尝试本地执行,然后才去等待,故我们自己new一个ForkJoinTask,一样可以通过invoke直接执行,这是第二种运行task的方式.

前面论述的join方法在某种情况下也是一种task的运行方式,在当前线程是ForkJoinWorkerThread时,会去尝试将task出队并doExec,也就是会先用本线程执行一次,不成功才干等,非ForkJoinWorkerThread则直接干等了.显然我们可以自己构建一个ForkJoinWorkerThread并去join,这时会将任务出队并执行(但存在一个问题:什么时候入队).且出队后若未执行成功,则awaitJoin(参考ForkJoinPool::awaitJoin),此时因任务已出队,不会被窃取或帮助(在awaitJoin中会有helpStealer,但其实任务是当前线程自己"偷走"了),似乎完全要靠自己了.但并不表示ForkJoinTask子类无法获取这个已出队的任务,比如CountedCompleter使用时,可以在compute中新生成的Completer时,将源CountedCompleter(ForkJoinTask的子类)作为新生成的CountedCountedCompleter的completer(该子类中的一个字段),这样,若有一个ForkJoinWorkerThread窃取了这个新生成的CountedCompleter,可以通过completer链表找到先前被出队的CountedCompleter(ForkJoinTask).关于CountedCompleter单独文章详述.

除此之外呢?包含前面提到的使用join操作不是ForkJoinWorkerThread调用的情况,不使用ForkJoinPool的submit execute入池,如何能让一个ForkJoinTask在将来执行?我们来看后面的方法.

//fork方法,将当前任务入池.
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
//如果当前线程是ForkJoinWorkerThread,将任务压入该线程的任务队列.
((ForkJoinWorkerThread)t).workQueue.push(this);
else
//否则调用common池的externalPush方法入队.
ForkJoinPool.common.externalPush(this);
return this;
}

显然,我们还可以通过对一个ForkJoinTask进行fork方法入池,入哪个池完全取决于当前线程的类型.这是第四种让任务能被运行的方式.

同样,我们也看到了第五种方式,ForkJoinPool.common其实就是一个常量保存的ForkJoinPool,它能够调用externalPush,我们自然也可以直接new一个ForkJoinPool,然后将当前task进行externalPush,字面意思外部压入.这种办法,非ForkJoinWorkerThread也能将任务提交到非common的ForkJoinPool.

从名字来看,ForkJoinTask似乎已经说明了一切,按照官方的注释也是如此.对一个task,先Fork压队,再Join等待执行结果,这是一个ForkJoinTask的执行周期闭环(但不要简单理解为生命周期,前面提到过,任务可以被重新初始化,而且重新初始化时还会清空ExceptionNode数组上的已回收成员).

到此为止,ForkJoinTask的核心函数和api已经基本了然,其它同类型的方法以及周边的方法均不难理解,如invokeAll的各种变种.下面来看一些"周边"类型的函数.有前述的基础,它们很好理解.

//取消一个任务的执行,直接将status设置成CANCELLED,设置后判断该status 是否为CANCELLED,是则true否则false.
public boolean cancel(boolean mayInterruptIfRunning) {
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
} //判断是否完成,status小于0代表正常完成/异常完成/取消,很好理解.
public final boolean isDone() {
return status < 0;
} //判断当前任务是否取消.
public final boolean isCancelled() {
//status前4位
return (status & DONE_MASK) == CANCELLED;
}
public final boolean isCompletedAbnormally() {
//是否为异常完成,前面说过,CANCELLED和EXCEPTIONAL均小于NORMAL
return status < NORMAL;
}
//是否正常完成.
public final boolean isCompletedNormally() {
//完成态位等于NORMAL
return (status & DONE_MASK) == NORMAL;
}
//获取异常.
public final Throwable getException() {
int s = status & DONE_MASK;
//当为正常完成或未完成时,返回null.
return ((s >= NORMAL) ? null :
//是取消时,新建一个取消异常.
(s == CANCELLED) ? new CancellationException() :
//不是取消,参考前面提到的getThrowableException.
getThrowableException());
}
//使用异常完成任务.
public void completeExceptionally(Throwable ex) {
//参考前述的setExceptionalCompletion,
//ex已经是运行时异常或者Error,直接使用ex完成,若是受检异常,包装成运行时异常.
setExceptionalCompletion((ex instanceof RuntimeException) ||
(ex instanceof Error) ? ex :
new RuntimeException(ex));
}
//使用value完成任务.
public void complete(V value) {
try {
//设置原始结果,它是一个空方法.前面说过ForkJoinTask没有维护result之类的结果字段,子类可自行发挥.
setRawResult(value);
} catch (Throwable rex) {
//前述步骤出现异常,就用异常方式完成.
setExceptionalCompletion(rex);
return;
}
//前面的结果执行完,标记当前为完成.
setCompletion(NORMAL);
}
//安静完成任务.直接用NORMAL setCompletion,没什么好说的.
public final void quietlyComplete() {
setCompletion(NORMAL);
} //安静join,它不会返回result也不会抛出异常.处理集合任务时,如果需要所有任务都被执行而不是一个执行出错(取消)其他也跟着出错的情况下,
//很明显适用,这不同于invokeAll,静态方法invokeAll或invoke(ForkJoinTask,ForkJoinTask)会在任何一个任务出现异常后取消执行并抛出.
public final void quietlyJoin() {
doJoin();
} //安静执行一次,不返回结果不抛出异常,没什么好说的.
public final void quietlyInvoke() {
doInvoke();
}
//重新初台化当前task
public void reinitialize() {
if ((status & DONE_MASK) == EXCEPTIONAL)
//如果当前任务是异常完成的,清除异常.该方法参考前面的论述.
clearExceptionalCompletion();
else
//否则重置status为0.
status = 0;
}
//反fork.
public boolean tryUnfork() {
Thread t;
//当前线程是ForkJoinWorkerThread,从它的队列尝试移除.
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
//当前线程不是ForkJoinWorkerThread,用common池外部移除.
ForkJoinPool.common.tryExternalUnpush(this));
}

上面是一些简单的周边方法,大多并不需要再论述了,unfork方法很明显在某些场景下不会成功,显然,当一个任务刚刚入队并未进行后续操作时,很可能成功.按前面所述,当对一个任务进行join时,可能会成功的弹出当前任务并执行,此时不可能再次弹出;当一个任务被其他线程窃取或被它本身执行的也不会弹出.

再来看一些老朋友,在前面的文章"CompletableFuture和响应式编程"一文中,作者曾着重强调过它将每个要执行的动作进行压栈(未能立即执行的情况),而栈中的元素Completion即是ForkJoinTask的子类,而标记该Completion是否被claim的方法和周边方法如下:

//获取ForkJoinTask的标记,返回结果为short型
public final short getForkJoinTaskTag() {
//status的后16位
return (short)status;
} //原子设置任务的标记位.
public final short setForkJoinTaskTag(short tag) {
for (int s;;) {
//不停循环地尝试将status的后16位设置为tag.
if (U.compareAndSwapInt(this, STATUS, s = status,
//替换的结果,前16位为原status的前16位,后16位为tag.
(s & ~SMASK) | (tag & SMASK)))
//返回被换掉的status的后16位.
return (short)s;
}
} //循环尝试原子设置标记位为tag,前提是原来的标记位等于e,成功true失败false
public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
for (int s;;) {
if ((short)(s = status) != e)
//如果某一次循环的原标记位不是e,则返回false.
return false;
//同上个方法
if (U.compareAndSwapInt(this, STATUS, s,
(s & ~SMASK) | (tag & SMASK)))
return true;
}
}

还记得CompletableFuture在异步执行Completion时要先claim吗?claim方法中,会尝试设置这个标记位.这是截止jdk8中CompletableFuture使用到ForkJoinTask的功能.

目前来看,在CompletableFuture的内部实现Completion还没有使用到ForkJoinTask的其他属性,比如放入一个ForkJoinPool执行(没有任何前面总结的调用,比如用ForkJoinPool的push,execute,submit等,也没有fork到common池).但是很明显,道格大神令它继承自ForkJoinTask不可能纯粹只为了使用区区一个标记位,试想一下,在如此友好支持响应式编程的CompletableFuture中传入的每一个action都可以生成若干新的action,那么CompletableFuture负责将这些action封装成Completion放入ForkJoinPool执行,将最大化利用到ForkJoin框架的工作窃取和外部帮助的功效,强力结合分治思想,这将是多么优雅的设计.或者在jdk9-12中已经出现了相应的Completion实现(尽管作者写过JAVA9-12,遗憾的是也没有去翻它们的源码).

另外,尽管Completion的众多子类也没有result之类的表示结果的字段,但它的一些子类通过封装,实际上间接地将这个Completion所引用的dep的result作为了自己的"result",当然,getRawResult依旧是null,但是理念却是相通的.

以上是ForkJoinTask的部分核心源码,除了上述的源码外,还有一些同属于ForkJoinTask的核心源码部分,比如其他的public方法(参考join fork invoke 即可),一些利用ForkJoinPool的实现,要深入了解ForkJoinPool才能了解的方法,一些不太难的静态方法等,这些没有必要论述了.

除了核心源码外,ForkJoinTask也提供了对Runnable,Callable的适配器实现,这块很好理解,简单看一看.

//对Runnable的实现,如果在ForkJoinPool中提交一个runnable,会用它封装成ForkJoinTask
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Runnable runnable;
T result;
AdaptedRunnable(Runnable runnable, T result) {
//不能没有runnable
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
//对runnable做适配器时,可以提交将结果传入,并设置为当前ForkJoinTask子类的result.
//前面说过,ForkJoinTask不以result作为完成标记,判断一个任务是否完成或异常,使用status足以,
//返回的结果才使用result.
this.result = result;
}
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
//前面说过提交入池的ForkJoinTask最终会运行doExec,而它会调用exec,此处会调用run.
public final boolean exec() { runnable.run(); return true; }
public final void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;//序列化用
} //无结果的runnable适配器
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
implements RunnableFuture<Void> {
final Runnable runnable;
AdaptedRunnableAction(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
}
//区别就是result固定为null,也不能set
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
public final void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
} //对runnable的适配器,但强制池中的工作线程在执行任务发现异常时抛出
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
final Runnable runnable;
RunnableExecuteAction(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
}
//默认null结果,set也是空实现
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
void internalPropagateException(Throwable ex) {
//前面说过doExec会被执行,它会调exec并catch,在catch块中设置当前任务为异常完成态,
//然后调用internalPropagateException方法,而在ForkJoinTask中默认为空实现.
//此处将异常重新抛出,将造成worker线程抛出异常.
rethrow(ex);
}
private static final long serialVersionUID = 5232453952276885070L;
} //对callable的适配器,当将callable提交至ForkJoinPool时使用.
static final class AdaptedCallable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Callable<? extends T> callable;
T result;
AdaptedCallable(Callable<? extends T> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;
}
//字段中有一个result,直接使用它返回.
public final T getRawResult() { return result; }
//result可外部直接设置.
public final void setRawResult(T v) { result = v; }
public final boolean exec() {
try {
//默认的result用call函数设置.
result = callable.call();
return true; } catch (Error err) {
//catch住Error,抛出
throw err;
} catch (RuntimeException rex) {
//catch住运行时异常,抛出
throw rex;
} catch (Exception ex) {
//catch住受检异常,包装成运行时异常抛出.
throw new RuntimeException(ex);
}
}
//run方法一样只是调用invoke,进而调用doExec.
public final void run() { invoke(); }
private static final long serialVersionUID = 2838392045355241008L;
} //runnable生成适配器的工具方法
public static ForkJoinTask<?> adapt(Runnable runnable) {
return new AdaptedRunnableAction(runnable);
} //指定结果设置runnable的适配器工具方法
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
return new AdaptedRunnable<T>(runnable, result);
} //对callable生成适配器的方法.
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
return new AdaptedCallable<T>(callable);
}

以上的代码都不复杂,只要熟悉了ForkJoinTask的本身代码结构,对于这一块了解非常容易,这也间接说明了ForkJoinPool中是如何处理Runnable和Callable的(因为ForkJoinPool本身也是一种线程池,可以接受提交Callable和Runnable).

将runnable提交到pool时,可以指定result,也可以不指定,也可以用submit或execute方法区分异常处理行为,ForkJoinPool会自行选择相应的适配器.

将callable 提交到pool时,pool会选择对callable的适配器,它的结果将为task的结果,它的异常将为task的异常.

到此为止,ForkJoinTask的源码分析完成.

后语

本文详细分析了ForkJoinTask的源码,并解释了前文CompletableFuture中Completion与它的关联,以及分析了Completion继承自ForkJoinTask目前已带来的功能利用(tag)和将来可能增加的功用(一个Completion产生若干多个Completion并在ForkJoinPool中运行,还支持工作窃取).

同时本文也对ForkJoinPool和ForkJoinWorkerThread,以及CountedCompleter和Stream api中的并行流进行了略微的描述.

在文章的最后,或许有一些新手读者会好奇,我们究竟什么时候会使用ForkJoinTask?

首先,如果你在项目中大肆使用了流式计算,并使用了并行流,那么你已经在使用了.

前面提过,官方解释ForkJoinTask可以视作比线程轻量许多的实体,也是轻量的Future.结合在源码中时不时出来秀存在感的ForkJoinWorkerThread,显然它就是据说比普通线程轻量一些的线程,在前面的源码中可以看出,它维护了一组任务的队列,每个线程负责完成队列中的任务,也可以偷其他线程的任务,甚至池外的线程都可以时不时地来个join,顺便帮助出队执行任务.

显然,对于重计算,轻io,轻阻塞的任务,适合使用ForkJoinPool,也就使用了ForkJoinTask,你不会认为它可以提交runnable和callable,就可以不用ForkJoinTask了吧?前面的适配器ForkJoinPool在这种情况下必用的,可以去翻相应的源码.

本章没有去详述CountedCompleter,但前面论述时说过,你可以在exec()中将一个计算复杂的任务拆解为小的子任务,然后将子任务入池执行,父任务合并子任务的结果.这种分治的算法此前基本是在单线程模式下运行,使用ForkJoinTask,则可以将这种计算交给一个ForkJoinPool中的所有线程并行执行.

转自: https://segmentfault.com/a/1190000019549838?utm_source=tag-newest