CountDownLatch指南

概述

通过使用CountDownLatch,可以阻塞线程,直到其他线程完成给定的任务。

CountDownLatch有一个计数器字段,该值与我们想要处理的线程数相同。然后,可以在每个线程完成后调用countdown(),保证调用await()的依赖线程将阻塞,直到工作线程完成。

等待线程池完成

让我们通过创建一个Worker并使用CountDownLatch字段在完成时发出信号来尝试这种模式:

public class Worker implements Runnable {
    private List outputScraper;
    private CountDownLatch countDownLatch;

    public Worker(List outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        doSomeWork();
        outputScraper.add("Counted down");
        countDownLatch.countDown();
    }
}

创建一个测试,验证可以获得CountDownLatch来等待Worker实例完成:

@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
  throws InterruptedException {

    List outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List workers = Stream
      .generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

      workers.forEach(Thread::start);
      countDownLatch.await(); 
      outputScraper.add("Latch released");

      assertThat(outputScraper)
        .containsExactly(
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Latch released"
        );
    }

等待同时开始的线程池

如果启动了数千个线程,那么很可能许多前面的线程在我们对后面的线程调用start()之前就已经完成了处理。这可能会使尝试和再现并发问题变得困难,因为我们无法使所有线程并行运行。

为了解决这个问题,可以使用CountdownLatch阻塞每个子线程,直到所有其他子线程都开始。

public class WaitingWorker implements Runnable {

    private List outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter;

    public WaitingWorker(
      List outputScraper,
      CountDownLatch readyThreadCounter,
      CountDownLatch callingThreadBlocker,
      CountDownLatch completedThreadCounter) {

        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run() {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            doSomeWork();
            outputScraper.add("Counted down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            completedThreadCounter.countDown();
        }
    }
}

修改测试用例,使其阻塞直到所有Workers都开始,取消阻塞Workers,然后阻塞直到Workers完成:

@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
 throws InterruptedException {
 
    List outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch readyThreadCounter = new CountDownLatch(5);
    CountDownLatch callingThreadBlocker = new CountDownLatch(1);
    CountDownLatch completedThreadCounter = new CountDownLatch(5);
    List workers = Stream
      .generate(() -> new Thread(new WaitingWorker(
        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    readyThreadCounter.await(); 
    outputScraper.add("Workers ready");
    callingThreadBlocker.countDown(); 
    completedThreadCounter.await(); 
    outputScraper.add("Workers complete");

    assertThat(outputScraper)
      .containsExactly(
        "Workers ready",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Workers complete"
      );
}

这种模式对于尝试重现并发错误非常有用,因为它可以用来迫使数千个线程尝试并行执行一些逻辑。

提前终止CountdownLatch

有时,可能会遇到这样一种情况,即Workers在倒计时之前错误地终止。这可能导致永远不会达到零,await()永远不会终止:

@Override
public void run() {
    if (true) {
        throw new RuntimeException("Oh dear, I'm a BrokenWorker");
    }
    countDownLatch.countDown();
    outputScraper.add("Counted down");
}

为了解决这个问题,await()的调用可添加一个超时参数。

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

结论

CountDownLatch用来阻塞线程,直到其他线程完成一些处理的业务场景。

展开阅读全文

页面更新:2024-03-06

标签:可能会   字段   线程   个子   计数器   倒计时   实例   错误   模式   测试   指南

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top