探秘多线程-闭锁、栅栏与异步编排

时间:2023-02-05 07:09:35

无论是项目开发还是开源代码阅读,多线程都是不可或缺的一个重要知识点,基于这个考量,于是总结出本篇文章,讨论闭锁(CountDownLatch)、栅栏(CyclicBarrier)与异步编排(CompletableFuture)
@Author:Akai-yuan
@更新时间:2023/2/4

1.适用场景

  1. 协调子线程结束动作:等待所有子线程运行结束

主线程创建了5个子线程,各子任务执行确认动作,期间主线程进入等待状态,直到各子线程的任务均已经完成,主线程恢复继续执行。

  1. 协调子线程开始动作:统一各线程动作开始的时机

从多线程的角度看,这恰似你创建了一些多线程,但是你需要统一管理它们的任务开始时间。

2.设计思想

CountDownLatch基于一个同步器实现,并且只有CountDownLatch(int count)一个构造器,指定数量count不得在中途修改它。
核心函数

  • await():等待latch降为0;
  • boolean await(long timeout, TimeUnit unit):等待latch降为0,但是可以设置超时时间。
  • countDown():latch数量减1;
  • getCount():获取当前的latch数量。

3.场景实例

场景1. 对各子线程的等待

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(4);

    Thread t1 = new Thread(countDownLatch::countDown);
    Thread t2 = new Thread(countDownLatch::countDown);
    Thread t3 = new Thread(countDownLatch::countDown);
    Thread t4 = new Thread(() -> {
        try {
            // 稍等...
            Thread.sleep(1500);
            countDownLatch.countDown();
        } catch (InterruptedException ignored) {}
    });

    t1.start();
    t2.start();
    t3.start();
    t4.start();
		//直到所有线程都对计数器进行减一后,这里才放行
    countDownLatch.await();
    System.out.println("所有子线程就位,可以继续执行其他任务");
}

场景2. 对多线程的统一管理

我们仍然用4个线程调用了start(),但是它们在运行时都在等待countDownLatch的信号,在信号未收到前,它们不会往下执行。

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(1);

    Thread t1 = new Thread(() -> waitForCountDown(countDownLatch));
    Thread t2 = new Thread(() -> waitForCountDown(countDownLatch));
    Thread t3 = new Thread(() -> waitForCountDown(countDownLatch));
    Thread t4 = new Thread(() -> waitForCountDown(countDownLatch));

    t1.start();
    t2.start();
    t3.start();
    t4.start();
    Thread.sleep(1000);
    countDownLatch.countDown();
    System.out.println("所有线程准备完成");
}

private static void waitForCountDown(CountDownLatch countDownLatch) {
    try {
			// 等待信号
        countDownLatch.await(); 
        System.out.println("本线程等待完毕");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

输出:

所有线程准备完成
本线程等待完毕
本线程等待完毕
本线程等待完毕
本线程等待完毕

Process finished with exit code 0

场景3. SOFAJRaft的实践

// 定义一个CountDownLatch计数器
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);

public void start() {
    switch (workerStateUpdater.get(this)) {
        case WORKER_STATE_INIT:
            if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                //此处调用工作线程执行CountDownLatch的countDown()方法
                //即startTimeInitialized.countDown();
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    // 等待startTime被工作线程初始化完成
    while (startTime == 0) {
        try {
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

2.CyclicBarrier

1.适用场景

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier与CountDownLatch的区别

CyclicBarrier CountDownLatch
CyclicBarrier是可重用的,其中的线程会等待所有的线程完成任务。届时,屏障将被拆除,并可以选择性地做一些特定的动作。 CountDownLatch是一次性的,不同的线程在同一个计数器上工作,直到计数器为0
CyclicBarrier面向的是线程数 CountDownLatch面向的是任务数
在使用CyclicBarrier时,你必须在构造中指定参与协作的线程数,这些线程必须调用await()方法 使用CountDownLatch时,则必须要指定任务数,至于这些任务由哪些线程完成无关紧要
CyclicBarrier可以在所有的线程释放后重新使用 CountDownLatch在计数器为0时不能再使用
在CyclicBarrier中,如果某个线程遇到了中断、超时等问题时,则处于await的线程都会出现问题 在CountDownLatch中,如果某个线程出现问题,其他线程不受影响

2.设计思想

1.构造器

// 指定参与方的数量;
public CyclicBarrier(int parties);
// 指定参与方的数量,并指定在本代次结束时运行的代码
public CyclicBarrier(int parties, Runnable barrierAction):

2.核心方法

//如果当前线程不是第一个到达屏障的话,它将会进入等待,直到其他线程都到达
//除非发生被中断、屏障被拆除、屏障被重设等情况
public int await();
//和await()类似,但是加上了时间限制;
public int await(long timeout, TimeUnit unit);
//当前屏障是否被拆除;
public boolean isBroken();
//重设当前屏障。会先拆除屏障再设置新的屏障
public void reset();
//正在等待的线程数量
public int getNumberWaiting();

3.场景实例

下面以一个简单的日常对话来讲解CyclicBarrier的使用实例

private static String appointmentPlace = "书房";

public static void main(String[] args) {
  CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("yuan所在的地点:" + appointmentPlace));
    // 线程Akai
    Thread Akai = newThread("Akai", () -> {
    System.out.println("yuan,饭好了快来吃饭...");
    try {
      // 此时Akai在屏障前等待
      cyclicBarrier.await();
      System.out.println("yuan,你来了...");
      // 开始吃饭...
      Thread.sleep(2600); 
      System.out.println("好的,你去洗你的碗吧!");
      // 第二次调用await
      cyclicBarrier.await(); 
      Thread.sleep(100);
      System.out.println("好吧,你这个懒猪!");
    } catch (Exception e) {
      e.printStackTrace();
    }
  });
  // 线程yuan
  Thread yuan = newThread("yuan", () -> {
    try {
      // yuan在敲代码
      Thread.sleep(500); 
      System.out.println("我在敲代码,我马上就来!");
      // yuan到达饭桌前
      cyclicBarrier.await(); 
      Thread.sleep(500); 
      System.out.println("Akai,不好意思,刚刚沉迷于敲代码了!");
      // 开始吃饭...
      Thread.sleep(1500); 
      // yuan想先吃完赶快洗碗然后溜出去敲代码
      System.out.println("我吃完了,我要去洗碗了"); 
      // yuan把地点改成了厨房
      appointmentPlace = "厨房"; 
      // 洗碗中...
      Thread.sleep(1500); 
      System.out.println("︎yuan终于洗完自己的碗了");
      // 第二次调用await
      cyclicBarrier.await();
      System.out.println("Akai你吃完了,你的碗自己去洗吧,我已经在敲代码了");
    } catch (Exception ignored) {}
  });

  Akai.start();
  yuan.start();
}

输出结果:

yuan,饭好了快来吃饭...
我在敲代码,我马上就来!
yuan所在的地点:书房
yuan,你来了...
Akai,不好意思,刚刚沉迷于敲代码了!
我吃完了,我要去洗碗了
好的,你去洗你的碗吧!
yuan终于洗完自己的碗了
yuan所在的地点:厨房
Akai你吃完了,你的碗自己去洗吧,我已经在敲代码了
好吧,你这个懒猪!    

3.CompletableFuture

1.设计思想

1.Future的局限性

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法;

2.Completable有哪些优势

CompletableFuture是Future接口的扩展和增强
CompletableFuture完整地继承了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。
从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

2.核心设计

我们首先来讨论CompletableFuture的核心:CompletionStage
顾名思义,根据CompletionStage名字中的"Stage",你可以把它理解为任务编排中的步骤。步骤,即任务编排的基本单元,它可以是一次纯粹的计算或者是一个特定的动作。在一次编排中,会包含多个步骤,这些步骤之间会存在依赖链式组合等不同的关系,也存在并行串行的关系。这种关系,类似于Pipeline或者流式计算。
既然是编排,就需要维护任务的创建、建立计算关系。为此,CompletableFuture提供了多达50多个方法,但没有必要全部完全理解,但我们可以通过分类的方式简化对方法的理解,理解了类型变种,基本上我们也就掌握了CompletableFuture的核心能力。
这些方法可以总结为以下四类,其他大部分方法都是基于这四种类型的变种:
探秘多线程-闭锁、栅栏与异步编排

3.核心用法

1.runAsync

  • runAsync()是CompletableFuture最常用的方法之一,它可以接收一个待运行的任务并返回一个CompletableFuture
  • 当我们想异步运行某个任务时,在以往需要手动实现Thread或者借助Executor实现。而通过runAsync()`就简单多了。比如,我们可以直接传入Runnable类型的任务:
CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        System.out.println("something");
    }
});

2.supply与supplyAsync

  • 所谓supply表示提供结果,换句话说当我们使用supply()时,就表明我们会返回一个结果,并且这个结果可以被后续的任务所使用。
// 创建nameFuture,返回姓名
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "Akai-yuan";
});

 // 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "love you," + name;
});

//阻塞获得表白的结果
System.out.println(sayLoveFuture.get()); // love you,Akai-yuan

一旦理解了supply()的含义,它也就如此简单。如果你希望用新的线程运行任务,可以使用supplyAsync().

3.thenApply与thenApplyAsync

  • 我们已经知道supply()是用于提供结果的,并且顺带提了thenApply()。很明显,thenApply()supply()的搭档,用于接收supply()的执行结果,并执行特定的代码逻辑,最后返回CompletableFuture结果
 // 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "爱你," + name;
});

public <U> CompletableFuture <U> thenApplyAsync(
    Function <? super T, ? extends U> fn) {
    return uniApplyStage(null, fn);
}

4.thenAccept与thenAcceptAsync

作为supply()的档案,thenApply()并不是唯一的存在,thenAccept()也是。但与thenApply()不同,thenAccept()只接收数据,但不会返回,它的返回类型是Void.

CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {
     System.out.println("爱你," + name);
});
        
public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {
    return uniAcceptStage(null, action);
}

5.thenRun

thenRun()就比较简单了,不接收任务的结果,只运行特定的任务,并且也不返回结果。

public CompletableFuture < Void > thenRun(Runnable action) {
   return uniRunStage(null, action);
}

所以,如果你在回调中不想返回任何的结果,只运行特定的逻辑,那么你可以考虑使用thenAcceptthenRun一般来说,这两个方法会在调用链的最后面使用。

6.thenCompose与 thenCombine

以上几种方法都是各玩各的,但thenCompose()与thenCombine()就不同了,它们可以实现对依赖非依赖两种类型的任务的编排。
编排两个存在依赖关系的任务
在前面的例子中,在接收前面任务的结果时,我们使用的是thenApply(). 也就是说,sayLoveFuture在执行时必须依赖nameFuture的完成,否则执行个锤子。

// 创建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "Akai-yuan";
});

 // 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
    return "爱你," + name;
});

但其实,除了thenApply()之外,我们还可以使用thenCompose()来编排两个存在依赖关系的任务。比如,上面的示例代码可以写成:

// 创建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
    return "Akai-yuan";
});

CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {
    return CompletableFuture.supplyAsync(() -> "爱你," + name);
});

可以看到,thenCompose()和thenApply()的核心不同之处在于它们的返回值类型

  • thenApply():返回计算结果的原始类型,比如返回String;
  • thenCompose():返回CompletableFuture类型,比如返回CompletableFuture.

组合两个相互独立的任务
考虑一个场景,当我们在执行某个任务时,需要其他任务就绪才可以,应该怎么做?这样的场景并不少见,我们可以使用前面学过的并发工具类实现,也可以使用thenCombine()实现。
举个例子,当我们计算某个胜率时,我们需要获取她参与的总场次(rounds),以及获胜的场次(winRounds),然后再通过winRounds / rounds来计算。对于这个计算,我们可以这么做:

CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);

CompletableFuture < Object > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            return 0.0;
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    });
System.out.println(winRateFuture.get());

thenCombine()将另外两个任务的结果同时作为参数,参与到自己的计算逻辑中。在另外两个参数未就绪时,它将会处于等待状态。

7.allOf与anyOf

allOf()与anyOf()也是一对孪生兄弟,当我们需要对多个Future的运行进行组织时,就可以考虑使用它们:

  • allOf():给定一组任务,等待所有任务执行结束;
  • anyOf():给定一组任务,等待其中任一任务执行结束。

allOf()与anyOf()的方法签名如下:

static CompletableFuture<Void>	 allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

需要注意的是,anyOf()将返回完任务的执行结果,但是allOf()不会返回任何结果,它的返回值是Void.
allOf()与anyOf()的示例代码如下所示。我们创建了roundsFuture和winRoundsFuture,并通过sleep模拟它们的执行时间。在执行时,winRoundsFuture将会先返回结果,所以当我们调用 CompletableFuture.anyOf时也会发现输出的是365.

 CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
   try {
     Thread.sleep(200);
     return 500;
   } catch (InterruptedException e) {
     return null;
   }
 });
 CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {
   try {
     Thread.sleep(100);
     return 365;
   } catch (InterruptedException e) {
     return null;
   }
 });

 CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture);
 System.out.println(completedFuture.get()); // 返回365

 CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);

在CompletableFuture之前,如果要实现所有任务结束后执行特定的动作,我们可以考虑CountDownLatch等工具类。现在,则多了一选项,我们也可以考虑使用CompletableFuture.allOf.

8.异常处理

在CompletableFuture链式调用中,如果某个任务发生了异常,那么后续的任务将都不会再执行。对于异常,我们有两种处理方式:exceptionally()handle().

1.使用exceptionally()回调处理异常

在链式调用的尾部使用exceptionally(),捕获异常并返回错误情况下的默认值。需要注意的是,exceptionally()仅在发生异常时才会调用

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            throw new RuntimeException("总场次错误");
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    }).exceptionally(ex -> {
        System.out.println("出错:" + ex.getMessage());
        return "";
    });
System.out.println(winRateFuture.get());
2. 使用handle()处理异常

除了exceptionally(),CompletableFuture也提供了handle()来处理异常。不过,与exceptionally()不同的是,当我们在调用链中使用了handle(),那么无论是否发生异常,都会调用它。所以,在handle()方法的内部,我们需要通过 if (ex != null) 来判断是否发生了异常。

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
    .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
        if (rounds == 0) {
            throw new RuntimeException("总场次错误");
        }
        DecimalFormat df = new DecimalFormat("0.00");
        return df.format((float) winRounds / rounds);
    }).handle((res, ex) -> {
        if (ex != null) {
            System.out.println("出错:" + ex.getMessage());
            return "";
        }
        return res;
    });
System.out.println(winRateFuture.get());

当然,如果我们允许某个任务发生异常而不中断整个调用链路,那么可以在其内部通过try-catch消化掉。