【JUC源码解析】Phaser

时间:2023-03-09 00:01:11
【JUC源码解析】Phaser

简介

Phaser,阶段器,可作为一个可复用的同步屏障,与CyclicBarrier和CountDownLatch类似,但更强大。

全览图

【JUC源码解析】Phaser

如上图所示,phaser,支持phaser树(图中,简化为phaser链表模式,独子单传,后文也称phaser链)模式,分摊并发的压力。每个phaser结点的father指针指向前一个phaser结点,最前头的结点成为root结点,其father指针指向null, 每一个结点的root指针指向root结点,root结点的root指针指向它自己。

只有root结点的evenQ和oddQ分别指向两个QNode链表。每个QNode结点包含有phaser和thread等关键熟悉,其中,thread指向当前线程,phaser指向当前线程所注册的phaser。

这两个链表里的线程所对应的phase(阶段)要么都为奇数,要么都为偶数,相邻阶段的两组线程一定在不同的链表里面,这样在新老阶段更迭时,操作的是不同的链表,不会错乱。整个phaser链,共用这两个QNode链。

而且,线程也只会在root结点上被封装进QNode结点入栈(QNode链,入栈,FIFO,后文有时也叫入队,不影响功能),每个phaser在初始时(被第一个线程注册时)以当前线程向其父phaser注册的方式与其父phaser建立联系,当此phaser上的线程都到达了,再以当前线程(最后一个抵达的线程)通知其父phaser,自己这边OK了,每个phaser都以同样的方式通知其父phaser,最后到达root phaser,开始唤醒睡在栈里(QNode链表)的线程,准备进入下一阶段。

phaser的关键属性state,是一个64位的long类型数据,划分为4个域:

  • unarrived   -- 还没有抵达屏障的参与者的个数 (bits 0-15)
  • parties       -- 需要等待的参与者的个数            (bits 16-31)
  • phase        -- 屏障所处的阶段                          (bits 32-62)
  • terminated -- 屏障的结束标记                          (bit 63 / sign)

特别地,初始时,state的值为1,称为EMPTY,也即是unarrived = 1,其余都为0,这是一个标记,表示此phaser还没有线程来注册过,EMPTY = 1,而不是0,是因为0有特殊的含意,可能表示所有的线程都到达屏障了,此时unarrived也为0(而不是初始状态),正常来讲,parties表示总的注册的线程的个数,大于等于unarrived,初始时,parties = 0,而unarrived = 1,更易于辨别。

源码分析

属性

     /*
* unarrived -- 还没有抵达屏障的参与者的个数 (bits 0-15)
* parties -- 需要等待的参与者的个数 (bits 16-31)
* phase -- 屏障所处的阶段 (bits 32-62)
* terminated -- 屏障的结束标记 (bit 63 / sign)
*/
private volatile long state; private static final int MAX_PARTIES = 0xffff; // 最大参与者个数
private static final int MAX_PHASE = Integer.MAX_VALUE; // 最大阶段值
private static final int PARTIES_SHIFT = 16; // 参与者移位
private static final int PHASE_SHIFT = 32; // 阶段移位
private static final int UNARRIVED_MASK = 0xffff; // 与int值与得未抵达屏障的参与者个数
private static final long PARTIES_MASK = 0xffff0000L; // 与long值与得参与者个数
private static final long COUNTS_MASK = 0xffffffffL; // 与之相与的unarrived和parties两部分值
private static final long TERMINATION_BIT = 1L << 63; // 终结位 private static final int ONE_ARRIVAL = 1; // 1个线程到达
private static final int ONE_PARTY = 1 << PARTIES_SHIFT; // 一个参与者
private static final int ONE_DEREGISTER = ONE_ARRIVAL | ONE_PARTY; // 一个参与者取消注册
private static final int EMPTY = 1; // 初始值 private final Phaser parent; // 指向父phaser
private final Phaser root; // 指向root phaser private final AtomicReference<QNode> evenQ; // 偶数phase的栈(线程)
private final AtomicReference<QNode> oddQ; // 奇数phase的栈(线程)

对于主状态,为了有效地维护原子性,这些值被打包成一个单独的(原子)数据(long类型),编码简单高效,竞争窗口(空间)小。

构造方法

     public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) { // 父phaser不为空
final Phaser root = parent.root;
this.root = root; // 指向root phaser
this.evenQ = root.evenQ; // 两个栈,整个phaser链只有一份
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1); // 向父phaser注册当前线程
} else {
this.root = this; // 否则,自己是root phaser
this.evenQ = new AtomicReference<QNode>(); // 负责创建两个栈(QNode链)
this.oddQ = new AtomicReference<QNode>();
}
// 更新状态
this.state = (parties == 0) ? (long) EMPTY
: ((long) phase << PHASE_SHIFT) | ((long) parties << PARTIES_SHIFT) | ((long) parties);
}

关键方法

doRegister(int)

     private int doRegister(int registrations) {
long adjust = ((long) registrations << PARTIES_SHIFT) | registrations; // 调整主状态的因子,parties | unarrived
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = (parent == null) ? state : reconcileState(); // reconcileState()方法是调整当前phaser的状态与root的一致
int counts = (int) s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int) (s >>> PHASE_SHIFT);
if (phase < 0)
break;
if (counts != EMPTY) { // 当前线程不是此phaser的第一次注册
if (parent == null || reconcileState() == s) {
if (unarrived == 0) // 上一阶段已经结束
root.internalAwaitAdvance(phase, null); // 等待进入到下一阶段
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) // 否则,CAS调整主状态
break;
}
} else if (parent == null) { // 当前phaser是root phaser
long next = ((long) phase << PHASE_SHIFT) | adjust; // 调整因子,phase | parties | unarrived
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) // CAS调整主状态
break;
} else {
synchronized (this) { // 第一个子phaser注册,需要加锁,不管多少个线程在子phaser上注册,而只需一个线程在其父phaser上注册
if (state == s) { // 加锁后,再次检查,看是否别的线程已经更新过主状态了
phase = parent.doRegister(1); // 向其父phaser注册(获得锁的当前线程)
if (phase < 0) // phaser链已结束,直接退出
break;
// 更新主状态
while (!UNSAFE.compareAndSwapLong(this, stateOffset, s,
((long) phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int) (root.state >>> PHASE_SHIFT);
}
break;
}
}
}
}
return phase;
}

arriveAndAwaitAdvance()

     public int arriveAndAwaitAdvance() {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState(); // 主状态
int phase = (int) (s >>> PHASE_SHIFT); // 当前阶段
if (phase < 0) // 已结束,退出
return phase;
int counts = (int) s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); // 未抵达线程个数
if (unarrived <= 0) // 非法值
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { // CAS更新主状态
if (unarrived > 1) // 还有未到达的线程
return root.internalAwaitAdvance(phase, null); // 等待,或自旋,或入栈阻塞
if (root != this)
return parent.arriveAndAwaitAdvance(); // 说明当前phaser上的所有线程都已经抵达,那么通知父phaser(其实作为一个整体【线程】,到达父phaser的屏障,具有传递性)
long n = s & PARTIES_MASK; // 当前phaser的总的参与者个数
int nextUnarrived = (int) n >>> PARTIES_SHIFT; // 作为下一阶段的未抵达参与者个数
if (onAdvance(phase, nextUnarrived)) // 冲破屏障时调用的方法,返回true,则结束phaser
n |= TERMINATION_BIT;
else if (nextUnarrived == 0) // 如果没有参与者了,恢复初始值EMPTY
n |= EMPTY;
else
n |= nextUnarrived; // 最后一种情况,phaser | unarrived
int nextPhase = (phase + 1) & MAX_PHASE; // 下一个阶段
n |= (long) nextPhase << PHASE_SHIFT; // phase | phaser | unarrived
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) // 更新主状态
return (int) (state >>> PHASE_SHIFT); // 有竞争,直接返回
releaseWaiters(phase); // 否则,释放掉阻塞在此阶段上的所有线程
return nextPhase;
}
}
}

internalAwaitAdvance(int phase, QNode node)

     private int internalAwaitAdvance(int phase, QNode node) {
releaseWaiters(phase - 1); // 确保老的队列里的线程全部释放了
boolean queued = false; // 标识是否成功入队
int lastUnarrived = 0; // 记录上次未到达参与者(线程)的个数,如果发生变化,则增加自旋次数(说不定马上结束了呢,这样就不用阻塞了)
int spins = SPINS_PER_ARRIVAL; // 自旋次数
long s;
int p;
while ((p = (int) ((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // 在不可中断模式下自旋
int unarrived = (int) s & UNARRIVED_MASK;
// 如果未到达参与者数量发生了变化,且变化后的未到达数量小于CPU核数,需要增加自旋次数
if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted(); // 获取并清除当前线程的中断标识
if (interrupted || --spins < 0) { // 如果当前线程被中断,或者自旋次数用完,创建一个结点,入队准备进入阻塞
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
} else if (node.isReleasable()) // 完成或放弃
break;
else if (!queued) { // 入队
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; // 根据phase选择奇偶队列
QNode q = node.next = head.get(); // 从头部入队,其实是入栈
if ((q == null || q.phase == phase) && (int) (state >>> PHASE_SHIFT) == phase)
queued = head.compareAndSet(q, node);
} else {
try {
ForkJoinPool.managedBlock(node); // 阻塞,其实调用的是QNode的block()方法,最终还是LockSupport.park()方法
} catch (InterruptedException ie) {
node.wasInterrupted = true; // 记录中断
}
}
} if (node != null) {
if (node.thread != null)
node.thread = null; // 被唤醒后,置空thread引用,避免再次unpark
if (node.wasInterrupted && !node.interruptible) // 不可中断模式下,传递中断
Thread.currentThread().interrupt();
if (p == phase && (p = (int) (state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // 依旧没有进入到下一个状态,清除那些由于超时或中断不再等待下一阶段的结点
}
releaseWaiters(phase); // 唤醒阻塞的线程
return p;
}

doArrive(boolean deregister)

     private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState(); // 主状态
int phase = (int) (s >>> PHASE_SHIFT); // 阶段
if (phase < 0) // 已结束,退出
return phase;
int counts = (int) s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); // 未抵达线程个数
if (unarrived <= 0) // 非法值
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= adjust)) { // CAS更新主状态
if (unarrived == 1) { // 如果未到达参与者的个数是1
long n = s & PARTIES_MASK; // 当前phaser的总的参与者个数
int nextUnarrived = (int) n >>> PARTIES_SHIFT; // 作为下一阶段的未抵达参与者个数
if (root == this) { // 如果当前phaser是root
if (onAdvance(phase, nextUnarrived)) // 冲破屏障时调用的方法,返回true,则结束phaser
n |= TERMINATION_BIT;
else if (nextUnarrived == 0) // 如果没有参与者了,恢复初始值EMPTY
n |= EMPTY;
else // 最后一种情况,phaser | unarrived
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE; // 下一个阶段
n |= (long) nextPhase << PHASE_SHIFT; // phase | phaser | unarrived
UNSAFE.compareAndSwapLong(this, stateOffset, s, n); // 更新主状态
releaseWaiters(phase); // 释放掉阻塞在此阶段上的所有线程
} else if (nextUnarrived == 0) { // 如果没有参与者了,从父phaser上注销(传递)
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); // 调整主状态
} else
phase = parent.doArrive(ONE_ARRIVAL); // 传递调用父phaser的doArrive方法
}
return phase;
}
}
}

此方法,与arriveAndAwaitAdvance()类似,但不阻塞,可能会有注销操作。

经典应用

     void build(Task[] tasks, int lo, int hi, Phaser ph) {
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(tasks, i, j, new Phaser(ph));
}
} else {
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(ph);
}
}

一组任务,一个phaser树,对这组任务进行分段,每一段任务挂到一个phaser上。

行文至此结束。

尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_phaser.html