Java JUC工具包之CyclicBarrier与CountDownLatch

假设有5个人约好一起去旅行,那么一般来说有2种组织出行方式,一种是自己组织自由行,另一种是跟团旅行。

在Java的JUC工具包中,有两个工具类可以类比这两种旅行方式,分别是CyclicBarrier和CountDownLatch。

两种旅行模式

1.自由旅行模式(CyclicBarrier)

旅行出发当天,5个人(按照约好的时间和地点)相互等待。先到的人,等待未到的人,一旦人齐了,集体成员就同时出发。

对应到CyclicBarrier类,就是:

有多个工作者线程,其中先准备好的线程,阻塞等待(await())未准备好的线程。一旦所有线程都准备好了,各个线程就同时执行任务。

关键知识点

  • 多个工作者线程;(其实1个也可以,但没意义)
  • 线程之间相互阻塞等待(await())彼此之间准备好
  • 所有线程马上同时执行

应用场景

CyclicBarrier一般应用于对公平性要求比较高,特别是需要同时执行的场景。

游戏类有很多适合的场景。例如剪子石头布比输赢,掷骰子比大小。

以掷骰子为例,先点击开始按钮一方的投掷线程,将会等待其他投掷线程。一旦所有游戏方都点击了开始按钮,所有投掷线程就同时运行并投掷出骰子。

2.跟团旅行模式(CountDownLatch)

旅行出发或结束当天,司机(按照约好的时间和地点)等待5名团员。每名团员到达后都进行签到(意味着未到人数减一),一旦人齐了,司机就立马开车。

对应到CountDownLatch类,就是:

有一到多个等待者线程,等待(await())一到多个被等待者线程做好准备或完成任务(countDown())。一旦所有被等待者线程都准备好或完成任务了,等待者线程就马上执行任务。

关键知识点

  • 一到多个等待者线程;
  • 一到多个被等待者线程;
  • 所有等待者线程,等待(await())所有被等待者线程准备好
  • 被等待者线程准备(countDown(),签到)完毕,所有等待者线程马上同时执行

应用场景

由于CountDownLatch多了一个countDown()方法,可以更灵活的使用。因此,其应用场景也相对广泛一些。

通常来说,CountDownLatch更适合用于等待一批线程准备好或完成指定任务,然后再进行下一步操作的场景。也就是说,等待者线程与被等待者线程之间存在先后依赖的执行顺序。

数据聚合。启动多个线程去抓取网页数据、查询接口数据或执行计算任务,另一个线程等待前者完成后,进行数据聚合相关操作。

投票选举主节点。候选节点启动多个线程,请求其他节点同意选自己为主节点。候选节点等待所有线程返回投票结果,如果收到半数以上同意选票则提升自己为主节点。

示例代码

CyclicBarrier

示例1

public class CyclicBarrierTest1 {

    private static int SIZE = 5;
    private static CyclicBarrier cb;

    public static void main(String[] args) {

        cb = new CyclicBarrier(SIZE);

        // 新建5个任务
        for (int i = 0; i < SIZE; i++)
            new InnerThread().start();
    }

    static class InnerThread extends Thread {
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");

                Thread.sleep(2000);
                // 将cb的参与者数量加1
                cb.await();

                // cb的参与者数量等于5时,才继续往后执行
                System.out.println(Thread.currentThread().getName() + " continued.");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:
Thread-1 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-3 continued.
Thread-4 continued.
Thread-0 continued.
Thread-2 continued.
Thread-1 continued.

示例2

CyclicBarrier支持在所有线程都准备好之后,执行特定的操作(构造函数的第2个参数指定的操作)。

public class CyclicBarrierTest2 {

    private static int SIZE = 5;
    private static CyclicBarrier cb;

    public static void main(String[] args) {

        cb = new CyclicBarrier(SIZE, new Runnable() {
            public void run() {
                System.out.println("CyclicBarrier's parties is: " + cb.getParties());
            }
        });

        // 新建5个任务
        for (int i = 0; i < SIZE; i++)
            new InnerThread().start();
    }

    static class InnerThread extends Thread {
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");

                Thread.sleep(2000);
                // 将cb的参与者数量加1
                cb.await();

                // cb的参与者数量等于5时,才继续往后执行
                System.out.println(Thread.currentThread().getName() + " continued.");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:
Thread-2 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-1 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-0 continued.
Thread-1 continued.

CountDownLatch

示例1

public class CountDownLatchTest1 {

    private static int LATCH_SIZE = 5;
    private static CountDownLatch doneSignal;

    public static void main(String[] args) {

        try {
            doneSignal = new CountDownLatch(LATCH_SIZE);

            // 创建5个任务,即被等待线程
            for (int i = 0; i < LATCH_SIZE; i++)
                new InnerThread().start();

            new Thread(new Runnable() {
                public void run() {
                    System.out.println("submain await begin.");
                    try {
                        // 子线程等待
                        doneSignal.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("submain await finished.");
                }
            }).start();

            // "主线程"等待线程池中5个任务的完成
            System.out.println("main await begin.");
            doneSignal.await();
            System.out.println("main await finished.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 被等待线程
     *
     */
    static class InnerThread extends Thread {
        public void run() {
            try {
                Thread.sleep(1000);

                System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
                // 线程签到,计数器减1
                doneSignal.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:
main await begin.
submain await begin.
Thread-2 sleep 1000ms.
Thread-0 sleep 1000ms.
Thread-3 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
main await finished.
submain await finished.

示例2

想像一种场景,当线程即是等待者线程,同时又是被等待者线程时,会是什么效果。答案是,可以实现CyclicBarrier的相互等待,线程同时执行的效果。

public class CountDownLatchTest3 {

    private static int SIZE = 5;
    private static CountDownLatch doneSignal;

    public static void main(String[] args) {

        doneSignal = new CountDownLatch(SIZE);

        // 新建5个任务
        for (int i = 0; i < SIZE; i++)
            new InnerThread().start();
    }

    static class InnerThread extends Thread {
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " wait for CountDownLatch.");

                Thread.sleep(2000);

                // 签到,计数器减1
                doneSignal.countDown();
                // 等待所有线程就绪
                doneSignal.await();

                // cb的参与者数量等于5时,才继续往后执行
                System.out.println(Thread.currentThread().getName() + " continued.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:
Thread-0 wait for CountDownLatch.
Thread-3 wait for CountDownLatch.
Thread-1 wait for CountDownLatch.
Thread-4 wait for CountDownLatch.
Thread-2 wait for CountDownLatch.
Thread-2 continued.
Thread-1 continued.
Thread-4 continued.
Thread-0 continued.
Thread-3 continued.

因此,如果要实现多个线程同时开始执行,可以使用CyclicBarrier或CountDownLatch来实现,前者显然更加直接明了。同时,还可以使用锁(synchronized、Lock)结合计数器进行实现。思路都是类似的:线程先相互等待,如果都就绪了,就开始执行。

最后,还要再说明一下,文章中提到的旅行模式,其实是我为了方便理解记忆而引入的一个概念,重在理解。


---转载本站文章请注明作者和出处 二进制之路(binarylife.icu),请勿用于任何商业用途---

留下评论