Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析

时间:2023-02-26 15:33:32

1.简介

在分析完AbstractQueuedSynchronizer(以下简称 AQS)和ReentrantLock的原理后,本文将分析 java.util.concurrent 包下的两个线程同步组件CountDownLatchCyclicBarrier。这两个同步组件比较常用,也经常被放在一起对比。通过分析这两个同步组件,可使我们对 Java 线程间协同有更深入的了解。同时通过分析其原理,也可使我们做到知其然,并知其所以然。

这里首先来介绍一下 CountDownLatch 的用途,CountDownLatch 允许一个或一组线程等待其他线程完成后再恢复运行。线程可通过调用await方法进入等待状态,在其他线程调用countDown方法将计数器减为0后,处于等待状态的线程即可恢复运行。CyclicBarrier (可循环使用的屏障)则与此不同,CyclicBarrier 允许一组线程到达屏障后阻塞住,直到最后一个线程进入到达屏障,所有线程才恢复运行。它们之间主要的区别在于唤醒等待线程的时机。CountDownLatch 是在计数器减为0后,唤醒等待线程。CyclicBarrier 是在计数器(等待线程数)增长到指定数量后,再唤醒等待线程。除此之外,两种之间还有一些其他的差异,这个将会在后面进行说明。

在下一章中,我将会介绍一下两者的实现原理,继续往下看吧。

2.原理

2.1 CountDownLatch 的实现原理

CountDownLatch 的同步功能是基于 AQS 实现的,CountDownLatch 使用 AQS 中的 state 成员变量作为计数器。在 state 不为0的情况下,凡是调用 await 方法的线程将会被阻塞,并被放入 AQS 所维护的同步队列中进行等待。大致示意图如下:

Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析

每个阻塞的线程都会被封装成节点对象,节点之间通过 prev 和 next 指针形成同步队列。初始情况下,队列的头结点是一个虚拟节点。该节点仅是一个占位符,没什么特别的意义。每当有一个线程调用 countDown 方法,就将计数器 state--。当 state 被减至0时,队列中的节点就会按照 FIFO 顺序被唤醒,被阻塞的线程即可恢复运行。

CountDownLatch 本身的原理并不难理解,不过如果大家想深入理解 CountDownLatch 的实现细节,那么需要先去学习一下 AQS 的相关原理。CountDownLatch 是基于 AQS 实现的,所以理解 AQS 是学习 CountDownLatch 的前置条件。我在之前写过一篇关于 AQS 的文章 Java 重入锁 ReentrantLock 原理分析,有兴趣的朋友可以去读一读。

2.2 CyclicBarrier 的实现原理

与 CountDownLatch 的实现方式不同,CyclicBarrier 并没有直接通过 AQS 实现同步功能,而是在重入锁 ReentrantLock 的基础上实现的。在 CyclicBarrier 中,线程访问 await 方法需先获取锁才能访问。在最后一个线程访问 await 方法前,其他线程进入 await 方法中后,会调用 Condition 的 await 方法进入等待状态。在最后一个线程进入 CyclicBarrier await 方法后,该线程将会调用 Condition 的 signalAll 方法唤醒所有处于等待状态中的线程。同时,最后一个进入 await 的线程还会重置 CyclicBarrier 的状态,使其可以重复使用。

在创建 CyclicBarrier 对象时,需要转入一个值,用于初始化 CyclicBarrier 的成员变量 parties,该成员变量表示屏障拦截的线程数。当到达屏障的线程数小于 parties 时,这些线程都会被阻塞住。当最后一个线程到达屏障后,此前被阻塞的线程才会被唤醒。

3.源码分析

通过前面简单的分析,相信大家对 CountDownLatch 和 CyclicBarrier 的原理有一定的了解了。那么接下来趁热打铁,我们一起探索一下这两个同步组件的具体实现吧。

3.1 CountDownLatch 源码分析

CountDownLatch 的原理不是很复杂,所以在具体的实现上,也不是很复杂。当然,前面说过 CountDownLatch 是基于 AQS 实现的,AQS 的实现则要复杂的多。不过这里仅要求大家掌握 AQS 的基本原理,知道它内部维护了一个同步队列,同步队列中的线程会按照 FIFO 依次获取同步状态就行了。好了,下面我们一起去看一下 CountDownLatch 的源码吧。

3.1.1 源码结构

CountDownLatch 的代码量不大,加上注释也不过300多行,所以它的代码结构也会比较简单。如下:

Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析

如上图,CountDownLatch 源码包含一个构造方法和一个私有成员变量,以及数个普通方法和一个重要的静态内部类 Sync。CountDownLatch 的主要逻辑都是封装在 Sync 和其父类 AQS 里的。所以分析 CountDownLatch 的源码,本质上是分析 Sync 和 AQS 的原理。相关的分析,将会在下一节中展开,本节先说到这。

3.1.2 构造方法及成员变量

本节来分析一下 CountDownLatch 的构造方法和其 Sync 类型的成员变量实现,如下:

public class CountDownLatch {

    private final Sync sync;

    /** CountDownLatch 的构造方法,该方法要求传入大于0的整型数值作为计数器 */
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化 Sync
this.sync = new Sync(count);
} /** CountDownLatch 的同步控制器,继承自 AQS */
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L; Sync(int count) {
// 设置 AQS state
setState(count);
} int getCount() {
return getState();
} /** 尝试在共享状态下获取同步状态,该方法在 AQS 中是抽象方法,这里进行了覆写 */
protected int tryAcquireShared(int acquires) {
/*
* 如果 state = 0,则返回1,表明可获取同步状态,
* 此时线程调用 await 方法时就不会被阻塞。
*/
return (getState() == 0) ? 1 : -1;
} /** 尝试在共享状态下释放同步状态,该方法在 AQS 中也是抽象方法 */
protected boolean tryReleaseShared(int releases) {
/*
* 下面的逻辑是将 state--,state 减至0时,调用 await 等待的线程会被唤醒。
* 这里使用循环 + CAS,表明会存在竞争的情况,也就是多个线程可能会同时调用
* countDown 方法。在 state 不为0的情况下,线程调用 countDown 是必须要完
* 成 state-- 这个操作。所以这里使用了循环 + CAS,确保 countDown 方法可正
* 常运行。
*/
for (;;) {
// 获取 state
int c = getState();
if (c == 0)
return false;
int nextc = c-1; // 使用 CAS 设置新的 state 值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}

需要说明的是,Sync 中的 tryAcquireShared 和 tryReleaseShared 方法并不是直接给 await 和 countDown 方法调用了的,这两个方法以“try”开头的方法最终会在 AQS 中被调用。

3.1.3 await

CountDownLatch中有两个版本的 await 方法,一个响应中断,另一个在此基础上增加了超时功能。本节将分析无超时功能的 await,如下:

/**
* 该方法会使线程进入等待状态,直到计数器减至0,或者线程被中断。当计数器为0时,调用
* 此方法将会立即返回,不会被阻塞住。
*/
public void await() throws InterruptedException {
// 调用 AQS 中的 acquireSharedInterruptibly 方法
sync.acquireSharedInterruptibly(1);
} /** 带有超时功能的 await */
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
} +--- AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 若线程被中断,则直接抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 调用 Sync 中覆写的 tryAcquireShared 方法,尝试获取同步状态
if (tryAcquireShared(arg) < 0)
/*
* 若 tryAcquireShared 小于0,则表示获取同步状态失败,
* 此时将线程放入 AQS 的同步队列中进行等待。
*/
doAcquireSharedInterruptibly(arg);
}

从上面的代码中可以看出,CountDownLatch await 方法实际上调用的是 AQS 的 acquireSharedInterruptibly 方法。该方法会在内部调用 Sync 所覆写的 tryAcquireShared 方法。在 state != 0时,tryAcquireShared 返回值 -1。此时线程将进入 doAcquireSharedInterruptibly 方法中,在此方法中,线程会被放入同步队列中进行等待。若 state = 0,此时 tryAcquireShared 返回1,acquireSharedInterruptibly 会直接返回。此时调用 await 的线程也不会被阻塞住。

3.1.4 countDown

与 await 方法一样,countDown 实际上也是对 AQS 方法的一层封装。具体的实现如下:

/** 该方法的作用是将计数器进行自减操作,当计数器为0时,唤醒正在同步队列中等待的线程 */
public void countDown() {
// 调用 AQS 中的 releaseShared 方法
sync.releaseShared(1);
} +--- AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
// 调用 Sync 中的 tryReleaseShared 尝试释放同步状态
if (tryReleaseShared(arg)) {
/*
* tryReleaseShared 返回 true 时,表明 state = 0,即计数器为0。此时调用
* doReleaseShared 方法唤醒正在同步队列中等待的线程
*/
doReleaseShared();
return true;
}
return false;
}

以上就是 countDown 的源码分析,不是很难懂,这里就不啰嗦了。

3.2 CyclicBarrier 源码分析

3.2.1 源码结构

如前面所说,CyclicBarrier 是基于重入锁 ReentrantLock 实现相关逻辑的。所以要弄懂 CyclicBarrier 的源码,仅需有 ReentrantLock 相关的背景知识即可。关于重入锁 ReentrantLock 方面的知识,有兴趣的朋友可以参考我之前写的文章 Java 重入锁 ReentrantLock 原理分析。下面看一下 CyclicBarrier 的代码结构吧,如下:

Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析

从上图可以看出,CyclicBarrier 包含了一个静态内部类Generation、数个方法和一些成员变量。结构上比 CountDownLatch 略为复杂一些,但总体仍比较简单。好了,接下来进入源码分析部分吧。

3.2.2 构造方法及成员变量

CyclicBarrier 包含两个有参构造方法,分别如下:

/** 创建一个允许 parties 个线程通行的屏障 */
public CyclicBarrier(int parties) {
this(parties, null);
} /**
* 创建一个允许 parties 个线程通行的屏障,若 barrierAction 回调对象不为 null,
* 则在最后一个线程到达屏障后,执行相应的回调逻辑
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

上面的第二个构造方法初始化了一些成员变量,下面我们就来说明一下这些成员变量的作用。

成员变量 作用
parties 线程数,即当 parties 个线程到达屏障后,屏障才会放行
count 计数器,当 count > 0 时,到达屏障的线程会进入等待状态。当最后一个线程到达屏障后,count 自减至0。最后一个到达的线程会执行回调方法,并唤醒其他处于等待状态中的线程。
barrierCommand 回调对象,如果不为 null,会在第 parties 个线程到达屏障后被执行

除了上面几个成员变量,还有一个成员变量需要说明一下,如下:

/**
* CyclicBarrier 是可循环使用的屏障,这里使用 Generation 记录当前轮次 CyclicBarrier
* 的运行状态。当所有线程到达屏障后,generation 将会被更新,表示 CyclicBarrier 进入新一
* 轮的运行轮次中。
*/
private Generation generation = new Generation(); private static class Generation {
// 用于记录屏障有没有被破坏
boolean broken = false;
}

3.2.3 await

上一节所提到的几个成员变量,在 await 方法中将会悉数登场。下面就来分析一下 await 方法的试下,如下:

public int await() throws InterruptedException, BrokenBarrierException {
try {
// await 的逻辑封装在 dowait 中
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
} private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
final Generation g = generation; // 如果 g.broken = true,表明屏障被破坏了,这里直接抛出异常
if (g.broken)
throw new BrokenBarrierException(); // 如果线程中断,则调用 breakBarrier 破坏屏障
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
} /*
* index 表示线程到达屏障的顺序,index = parties - 1 表明当前线程是第一个
* 到达屏障的。index = 0,表明当前线程是最有一个到达屏障的。
*/
int index = --count;
// 当 index = 0 时,唤醒所有处于等待状态的线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 如果回调对象不为 null,则执行回调
if (command != null)
command.run();
ranAction = true;
// 重置屏障状态,使其进入新一轮的运行过程中
nextGeneration();
return 0;
} finally {
// 若执行回调的过程中发生异常,此时调用 breakBarrier 破坏屏障
if (!ranAction)
breakBarrier();
}
} // 线程运行到此处的线程都会被屏障挡住,并进入等待状态。
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
/*
* 若下面的条件成立,则表明本轮运行还未结束。此时调用 breakBarrier
* 破坏屏障,唤醒其他线程,并抛出异常
*/
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
/*
* 若上面的条件不成立,则有两种可能:
* 1. g != generation
* 此种情况下,表明循环屏障的第 g 轮次的运行已经结束,屏障已经
* 进入了新的一轮运行轮次中。当前线程在稍后返回 到达屏障 的顺序即可
*
* 2. g = generation 但 g.broken = true
* 此种情况下,表明已经有线程执行过 breakBarrier 方法了,当前
* 线程则会在稍后抛出 BrokenBarrierException
*/
Thread.currentThread().interrupt();
}
} // 屏障被破坏,则抛出 BrokenBarrierException 异常
if (g.broken)
throw new BrokenBarrierException(); // 屏障进入新的运行轮次,此时返回线程在上一轮次到达屏障的顺序
if (g != generation)
return index; // 超时判断
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
} /** 开启新的一轮运行过程 */
private void nextGeneration() {
// 唤醒所有处于等待状态中的线程
trip.signalAll();
// 重置 count
count = parties;
// 重新创建 Generation,表明进入循环屏障进入新的一轮运行轮次中
generation = new Generation();
} /** 破坏屏障 */
private void breakBarrier() {
// 设置屏障是否被破坏标志
generation.broken = true;
// 重置 count
count = parties;
// 唤醒所有处于等待状态中的线程
trip.signalAll();
}

3.2.4 reset

reset 方法用于强制重置屏障,使屏障进入新一轮的运行过程中。代码如下:

public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 破坏屏障
breakBarrier(); // break the current generation
// 开启新一轮的运行过程
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

reset 方法并不复杂,没什么好讲的。CyclicBarrier 中还有其他一些方法,均不复杂,这里就不一一分析了。

4.两者区别

看完上面的分析,相信大家对着两个同步组件有了更深入的认识。那么下面趁热打铁,简单对比一下两者之间的区别。这里用一个表格列举一下:

差异点 CountDownLatch CyclicBarrier
是否可循环使用
是否可设置回调

除了上面列举的差异点,还有一些其他方面的差异,这里就不一一列举了。

5.总结

分析完 CountDownLatch 和 CyclicBarrier,不知道大家有什么感觉。我个人的感觉是这两个类的源码并不复杂,比较好理解。当然,前提是建立在对 AQS 以及 ReentrantLock 有较深的理解之上。所以在学习这两个类的源码时,还是建议大家先看看前置知识。

好了,本文到这里就结束了。谢谢阅读,再见。

本文在知识共享许可协议 4.0 下发布,转载需在明显位置处注明出处

作者:coolblog

本文同步发布在我的个人博客:http://www.coolblog.xyz/?r=cb

Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。

Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析的更多相关文章

  1. JAVA线程同步辅助类CountDownLatch

    一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 用给定的计数 初始化 CountDownLatch.由于调用了 countDown() 方法,所以在当前计数到达 ...

  2. Java 并发同步器之CountDownLatch、CyclicBarrier

    一.简介 1.CountDownLatch是一个同步计数器,构造时传入int参数,该参数就是计数器的初始值,每调用一次countDown()方法,计数器减1,计数器大于0 时,await()方法会阻塞 ...

  3. Java并发包源码学习系列:同步组件CountDownLatch源码解析

    目录 CountDownLatch概述 使用案例与基本思路 类图与基本结构 void await() boolean await(long timeout, TimeUnit unit) void c ...

  4. java 线程同步 原理 sleep和wait区别

    java线程同步的原理java会为每个Object对象分配一个monitor, 当某个对象(实例)的同步方法(synchronized methods)被多个线程调用时,该对象的monitor将负责处 ...

  5. Java线程同步之一--AQS

    Java线程同步之一--AQS 线程同步是指两个并发执行的线程在同一时间不同时执行某一部分的程序.同步问题在生活中也很常见,就比如在麦当劳点餐,假设只有一个服务员能够提供点餐服务.每个服务员在同一时刻 ...

  6. Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

    Java并发编程:CountDownLatch.CyclicBarrier和Semaphore 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch ...

  7. Java并发编程:CountDownLatch、CyclicBarrier和Semaphore (总结)

    下面对上面说的三个辅助类进行一个总结: 1)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同: CountDownLatch一般用于某个线程A等待 ...

  8. 14、Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

    Java并发编程:CountDownLatch.CyclicBarrier和Semaphore 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch ...

  9. 【转】Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

    Java并发编程:CountDownLatch.CyclicBarrier和Semaphore   Java并发编程:CountDownLatch.CyclicBarrier和Semaphore 在j ...

随机推荐

  1. AspNetPager控件报错误&colon; Syntax error&comma; unrecognized expression&colon; input&num;ctl00&dollar;ContentPlaceHolder1&dollar;Aspnetpager1&lowbar;input问题解决&lbrack;摘&rsqb;

    高版本IE,如IE10或者IE11在浏览页面时出现错误: Syntax error, unrecognized expression: input#ctl00$ContentPlaceHolder1$ ...

  2. 【IHttpHandler】在ASP&period;Net2&period;0中使用UrlRewritingNet实现链接重写

    很多时候我们需要链接转向(Url Rewriting),例如二级域名转向.文章访问链接等场合. 让我们看两个例子: 1 你现在看到的当前作者的博客园的域名: http://jx270.cnblogs. ...

  3. WWDC————苹果全球开发者大会

    WWDC:Apple Worldwide Developers Conference(苹果全球开发者)的简称,每年定期由苹果公司(Apple Inc.)在美国举办.大会主要的目的是让苹果公司向研发者们 ...

  4. &lbrack;RxJS&rsqb; Handling a Complete Stream with Reduce

    When a stream has completed, you often need to evaluate everything that has happened while the strea ...

  5. Java编程练习(四)——集合框架应用

    Java集合框架小应用之扑克牌小游戏 学习了Java集合框架之后,我写了一个扑克牌小游戏来巩固知识.学习之余的练习之作,有不足之处还得多多指教了~(*/ω\*) 扑克牌小游戏背景: 1. 创建一副扑克 ...

  6. Vue脚手架创建项目

    创建一个基于webpack模板的新项目 D:\Git $ vue -V D:\Git $ vue init webpack my-project ? Project name my-project ? ...

  7. oracle A用户访问B用户的表aa

    在B中:grant select on aa to A; (还可以配置insert,update,delete权限)

  8. 抢票季:吐槽12306 &amp&semi; 分享抢票经验

    又是一年一度的春运抢票季,不管你是北上.南下或者东进,在外漂泊了一年,有钱没钱总是要回家过年的. [图片来源于网络] 吐槽:12306抢票的悲伤 据说12306改版了,新版本里面除了UI这些面儿上的改 ...

  9. Java计算几何图形的面积

    对于每个几何图形而言,都有一些共同的属性,如名字.面积等,而其计算面积的方法却各不相同.为了简化开发,请编写程序,定义一个超类来实现输入名字的方法,并使用抽象方法来计算面积. 思路分析: 所谓超类就是 ...

  10. 用户从地址栏输入url,按下enter键后,直到页面加载完成的这个过程都发生了什么?

    流程大概描述一下: 用户将url输入后,服务器接受到请求,然后将这个请求进行处理,然后将处理后的结果返回给浏览器,浏览器将该结果以页面的形式呈现给用户. 详细描述: 1:用户将url(例如www.ba ...