JDK源码分析之concurrent包(四) -- CyclicBarrier与CountDownLatch

时间:2023-03-08 22:24:22

  上一篇我们主要通过ExecutorCompletionService与FutureTask类的源码,对Future模型体系的原理做了了解,本篇开始解读concurrent包中的工具类的源码。首先来看两个非常实用的工具类CyclicBarrier与CountDownLatch是如何实现的。


CyclicBarrier

CyclicBarrier直译过来是“循环屏障”,作用是可以使固定数量的线程都达到某个屏障点(调用await方发处)后,才继续向下执行。关于用法和实例本文就不做过多说明,现在直接进入CyclicBarrier的源码。

首先,来看下CyclicBarrier的几个标志性的成员变量:

 private static class Generation {
boolean broken = false;
}
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation(); /**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;

这几个成员变量有以下说明:

  • 说明1:parties是final的,在构造时,传入的固定线程数,不可变;
  • 说明2:count是计数器,如果有线程到达了屏障点,count就减1;
  • 说明3:直到count=0时,其它线程才可以向下执行;
  • 说明4:barrierCommand是Runnable任务,在所有线程到达屏障点是,就执行barrierCommand,barrierCommand是构造时传入的,可以为空;
  • 说明5:generation比较复杂,是静态内部类Generation的实例,一个generation对象代表一代的屏障,就是说,如果generation对象不同,就代表进入了下一次的屏障,所以说,这个线程屏障是可循环的(Cyclic)。
  • 说明6:另外,generation的唯一的一个名为broken的成员变量代表屏障是否被破坏掉,破坏的原因可能是线程中断、失败或者超时等。如果被破坏,则所有线程都将抛出异常。

了解上述成员变量的说明后,基本上就可以知道了CyclicBarrier的实现原理,下面我们来看看代码是如何写的。其实实现很简单,我们只需通过await()方法就可以说明:

 public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}

await()方法调用了真是的执行方法dowait(),这个方法里涵盖了所有乾坤:

 /**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation; if (g.broken)
throw new BrokenBarrierException(); if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
} int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
} // loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
} if (g.broken)
throw new BrokenBarrierException(); if (g != generation)
return index; if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

代码第20行对应“说明2”。

代码第21行对应“说明3”。

代码第26行对应“说明4”。

代码第28行对应“说明5”,nextGeneration()方法中使用generation = new Generation();表示屏障已经换代,并唤醒所有线程。nextGeneration()请自行查看源码。

代码第16行第45行等所有调用breakBarrier()方法处,对应“说明6”,表示屏障被破坏,breakBarrier()方法中将generation.broken = true,唤醒所有线程,抛出异常。

最后,代码第40行处trip.await(),表示持有trip的线程进入等待被唤醒状态。

另外,实现中还有一个很重要的点,就是第8行的lock和第67行的unlock,保证同步状态下执行这段逻辑,也就保证了count与generation.broken的线程安全。

以上就是CyclicBarrier(循环使用的屏障)的源码实现,是不是比较简单。

CountDownLatch

CountDownLatch直译过来是“倒计数锁”。在线程的countDown()动作将计数减至0时,所有的await()处的线程将可以继续向下执行。CountDownLatch的功能与CyclicBarrier有一点点像,但实现方式却很不同,下面直接来观察CountDownLatch的两个最重要的方法:

 public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
} public void countDown() {
sync.releaseShared(1);
}

可以看到,这两个方法实际是由静态内部类Sync来实现的。这个Sync我们在上一篇FutureTask的实现中也见过,那我们就先简单介绍下Sync究竟是用来做什么的:

Sync extends AbstractQueuedSynchronizer

这个抽象类AbstractQueuedSynchronizer是一个框架,这个框架使用了“共享”与“独占”两张方式通过一个int值来表示状态的同步器。类中含有一个先进先出的队列用来存储等待的线程。

这个类定义了对int值的原子操作的方法,并强制子类定义int的那种状态是获取,哪种状态是释放。子类可以选择“共享”和“独占”的一种或两种来实现。

共享方式的实现方式是死循环尝试获取对象状态,类似自旋锁。

独占方式的实现方式是通过实现Condition功能的内部的类,保证独占锁。

而我们正在解读的CountDownLatch中的内部类Sync是使用的共享方式,对于AbstractQueuedSynchronizer的解读本篇不打算详细说明,因为笔者对“独占”方式还没彻底弄通,如果以后有机会再来补充。

接下来就直接观察CountDownLatch.Sync的源码:

 /**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L; Sync(int count) {
setState(count);
} int getCount() {
return getState();
} public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
} public boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

结合最初列出的await()和countDown()方法,

通过上述代码第9行可以看到,CountDownLatch将构造时传入的用来倒计数的count作为状态值。

通过上述代码第17行可以看到,CountDownLatch定义了当count=0时表示可以共享获取状态(在await()方法中调用的sync.acquireSharedInterruptibly(1)会死循环尝试获取状态)。

通过上述代码第26行可以看到,CountDownLatch定义了当count-1表示一次共享释放状态(在countDown()方法中调用的sync.releaseShared(1)会涉及)。

以上就是CountDownLatch的源码实现。

总结

CyclicBarrier与CountDownLatch有一点相似之处,但是有很大区别。它们的异同我个人总结如下:

类似功能

  • CyclicBarrier与CountDownLatch都是通过计数到达一定标准后,使得在await()处的线程继续向下执行。

不同之处

  • CyclicBarrier的实现是通过线程的等待唤醒;CountDownLatch的实现是通过死循环访问状态的自旋机制
  • CyclicBarrier在线程改变计数后不能向下执行(await()改变计数);CountDownLatch在线程改变计数后继续向下执行(countDown()改变计数)
  • CyclicBarrier的计数可以被重置,循环使用;CountDownLatch的计数只能使用一次