03_concurrency_10_phaser

第四十四章 线程同步辅助类CountDownLatch,CyclicBarrier,Phaser和Exchanger

1 概述

本章介绍Java多线程标准库中的CountDownLatch,CyclicBarrier和Phaser类,它们能帮助开发人员简化线程同步的复杂度。与关键字Synchronized和锁Lock不同的是,关键字synchronized和锁Lock偏向于通过控制线程的进度以保护共享资源,而这三个类偏向于通过控制线程的进度以达到线程进度上的同步。使用这三个类可以帮助多线程程序分阶段运行(Phase-Oriented Execution)。Exchanger类帮助多线程程序交互数据。

2 CountDownLatch类

java.util.concurrent.CountDownLatch类是一个帮助线程同步的辅助类。从名字上看,Latch是一种门栓,CountDown的意思是倒着数数(例如:倒计时的过程,5,4,3,2,1)。将其放在一起,CountDownLatch的意思是一把“倒计数”的门栓。每个CountDownLatch对象内部包含了一个计数器。当计数器变为0时,门栓才被打开,等待开门的线程才能通过这道门。

所以,CountDownLatch有几个非常重要的方法。在构造函数中,使用入参count初始化CountDownLatch对象的计数器。countDown()方法每次递减计数器。countDown()方法可由参与任务执行的线程调用,也可以由第三方线程调用。await()方法使得当前线程变为等待状态,直到计数器变为0,或者超时(如果使用第二种await()方法)。

package java.util.concurrent;

public class CountDownLatch {
    public CountDownLatch(int count);
    public void countDown();
    public void await();
    public void await(long timeout, TimeUnit unit);
}

我们在下面例举两个非常有用的例子。第一个例子是:假设有三个线程同时运行。当某一条件未被满足时,线程A和线程B需要等待。当这个条件被满足后,线程A和线程B会同时开始继续运行。线程C则处理这个条件。这个例子非常常见是因为开发人员可以使用CountDownLatch来对齐两个或者多个线程,让其同时开始运行一个任务。

在下面的代码中,main()方法会创建出两个ThreadAB线程对象。run()方法中的处理被分为两个阶段进行。第一个阶段打印"The thread has started, but it is waiting."字符串;在第二个阶段打印"Start the tasks in phase 2."。在第一个阶段结束后,调用了latch.await()方法,使得这两个线程处于等待状态。与此同时,主线程main()方法在启动了两个线程之后,会先等待1秒,然后再调用latch.countDown()启动第二阶段的运行。

import java.util.concurrent.CountDownLatch;

public class ThreadAB implements Runnable {
    private String name = null;
    private CountDownLatch latch  = null;

    public ThreadAB(String name, CountDownLatch latch) {
        this.name = name;
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            System.out.println(name + " has started, but it is waiting.");

            latch.await();
            // 完成第二阶段的任务

            System.out.println(name + " starts the tasks in phase 2.");
        } catch (InterruptedException ex) {}
        
    }

    public static void main (String[] args) {
        try {
            CountDownLatch latch = new CountDownLatch(1);
            Thread threadA = new Thread(new ThreadAB("Thread A", latch));
            Thread threadB = new Thread(new ThreadAB("Thread B", latch));
    
            threadA.start();
            threadB.start();
    
            // 完成第一阶段的任务
            Thread.sleep(1000); // 等待1秒
            System.out.println("The tasks in phase 1 completes.");
    
            // 启动第二阶段的任务
            latch.countDown(); 
            threadA.join();
            threadB.join();
        } catch (InterruptedException ex) {}
    }
}

程序运行结果:

> java ThreadAB
Thread B has started, but it is waiting.
Thread A has started, but it is waiting.
The tasks in phase 1 completes.
Thread B starts the tasks in phase 2.
Thread A starts the tasks in phase 2.

第二个例子是多个线程同时完成一批任务。这些任务被分为两个阶段完成。因为,第二阶段的任务依赖于第一阶段任务的结果,所以,第一阶段的任务全部完成后才能启动第二阶段的任务。

在下面的例子中,main()方法同时启动两个线程。latch对象的计数器初始化为2。在线程的run()方法中,当第一阶段结束时,首先调用latch.countDown()表明本线程第一阶段已结束。然后调用latch.await()方法表示等待第二阶段的开始。

import java.util.concurrent.CountDownLatch;

public class TwoPhaseWorker implements Runnable {
    private CountDownLatch latch  = null;
    public TwoPhaseWorker(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            System.out.println("The thread is working on tasks in phase 1.");
            // 完成第一阶段的任务
            latch.countDown();  // 表明第一阶段任务结束
            latch.await();  // 表示等待第二阶段开始
            System.out.println("Start working on tasks in phase 2.");
            // 完成第二阶段的任务
        } catch (InterruptedException ex) {}
    }

    public static void main (String[] args) {
        try {
            // 计数器初始化为2
            CountDownLatch latch = new CountDownLatch(2);
            Thread threadA = new Thread(new TwoPhaseWorker(latch));
            Thread threadB = new Thread(new TwoPhaseWorker(latch));
    
            threadA.start();
            threadB.start();
            threadA.join();
            threadB.join();
        } catch (InterruptedException ex) {}
    }
}

程序运行结果:

> java TwoPhaseWorker
The thread is working on tasks in phase 1.
The thread is working on tasks in phase 1.
Start working on tasks in phase 2.
Start working on tasks in phase 2.

CountDownLatch对象是一次性的。即当CountDownLatch对象触发了释放线程之后,该对象不能重复使用。需要反复触发的场景可参考CyclicBarrier类。

3 CyclicBarrier类

java.util.concurrent.CyclicBarrier的用法与CountDownLatch十分相似,但是,一个显著的区别是CyclicBarrier对象是循环的(Cyclic),可以反复使用的。另一个区别是,CyclicBarrier不能以外部的某个状态为触发条件。线程是否可以继续运行取决于线程自身的运行状态。

CyclicBarrier可用于多线程的分阶段执行模式。CyclicBarrier对象会为一组线程设置一个障碍点。当只有部分线程到达这个障碍点时,这些线程会处于等待状态。只有当所有的线程到达这个障碍点时,所有的线程才能通过这个障碍点继续运行。CyclicBarrier有以下重要的方法。在构造函数中,入参parties指明了该障碍点可以阻挡的线程数。每一个线程对象被称为一个参与方(party)。调用await()的线程表示该线程在该障碍点被阻挡,需要等待。只有当等待的线程数达到parties时,这些线程才能被同时释放,继续前进。

第二个构造函数还接收一个Runnable对象。当最后一个线程到达障碍点时,在所有线程被释放之前,这个Runnable对象会在最后一个到达的线程中运行。开发人员可以利用这个Runnable对象完成一些阶段之间需要完成的工作(例如:清理前一阶段使用的资源或者初始化下一个阶段需要使用的资源)。

第二个await()方法同时还接收一个最长等待时间。如果当任意一个线程因为到达障碍点而进入等待状态,但是,在等待时被打断(interrupted)的话,该进程会继续执行。CyclicBarrier会处于broken状态。其他等待的线程会收到BrokenBarrierException异常。线程被打断的原因可能有多种,例如:因超时而被打断,或者被其他线程调用interrupt()方法打断。

package java.util.concurrent;

public class CyclicBarrier {
    public CyclicBarrier (int parties);
    public CyclicBarrier (int parties, Runnable barrierAction);

    void await();
    void await(long timeout, TimeUnit unit);
}

下面是一个具体的例子。在main()方法中启动了5个线程,所以,barrier的parties个数初始化为5。在线程的run()方法中,当第一个阶段任务完成后,调用barrier.await()方法等待与其他线程同时进入第二阶段。因为barrier是可以重复使用的。所以,在第二阶段完成后,再次调用barrier.await()方法等待与其他线程同时进入第三阶段。以此类推。图一展示了这五个线程运行的过程。

图一 五个线程运行的过程 图一 五个线程运行的过程)

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;

public class CyclicBarrierWorker implements Runnable {
    private String name = null;
    private CyclicBarrier barrier  = null;

    public CyclicBarrierWorker(String name, CyclicBarrier barrier) {
        this.name = name;
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try{ 
            System.out.println(name + " is working on tasks in phase 1.");
            // 完成第一阶段的任务
            barrier.await();
            
            System.out.println(name + " is working on tasks in phase 2.");
            // 完成第二阶段的任务
            
            barrier.await();
            System.out.println(name + " is working on tasks in phase 3.");
            // 完成第三阶段的任务
        } catch (InterruptedException | BrokenBarrierException ex) {}
    }

    public static void main (String[] args) {
        try {
            int parties = 5;
            CyclicBarrier barrier = new CyclicBarrier(parties);
            Thread[] threads = new Thread[parties];
            for (int i = 0; i < parties; i++) {
                threads[i] = new Thread(new CyclicBarrierWorker("Thread " + i, barrier));
            }
    
            for (Thread thread: threads) {
                thread.start();
            }
    
            for (Thread thread: threads) {
                thread.join();
            }
        } catch (InterruptedException ex) {}
    }
}

程序运行结果:

> java CyclicBarrierWorker
Thread 0 is working on tasks in phase 1.
Thread 4 is working on tasks in phase 1.
Thread 2 is working on tasks in phase 1.
Thread 1 is working on tasks in phase 1.
Thread 3 is working on tasks in phase 1.
Thread 3 is working on tasks in phase 2.
Thread 2 is working on tasks in phase 2.
Thread 1 is working on tasks in phase 2.
Thread 4 is working on tasks in phase 2.
Thread 0 is working on tasks in phase 2.
Thread 3 is working on tasks in phase 3.
Thread 1 is working on tasks in phase 3.
Thread 4 is working on tasks in phase 3.
Thread 2 is working on tasks in phase 3.
Thread 0 is working on tasks in phase 3.

4 Phaser类

java.util.concurrent.Phaser类提供了更加灵活的,分阶段运行(Phase-Oriented)的线程同步功能。Phaser对象接受线程的注册(register)和离开(de-register)。换句话说,与Phaser对象相关联的,受Phaser对象阻挡的线程的个数是可以变化的。Phaser对象也是可循环使用的,这一点和CyclicBarrier相同。

Phaser对象内部还维护着一个阶段序号(phase number)。阶段序号初始化为0。每当所有注册的线程到达障碍点时,所有的线程被释放,继续运行。阶段序号自动增加1,表示进入下一个阶段。当阶段序号到达最大值Integer.MAX_VALUE后,自动回到0,从新开始。

Phaser的工作流程由两个部分组成。

  1. 注册(Registration)或者离开(Deregistration)。线程可以在任何时候向一个Phaser对象注册(register),或者在任何时候离开(de-register)。在Phaser内部,实际上只保留了一个计数器,记录有多少注册的线程。因此,在注册后,线程无法向其查询自身是否已注册。注册操作可由register()方法和bulkRegister(int)方法完成。
  2. 同步(Synchronization)。同步这个步骤包含了几个操作。Phaser类提供了一些方法将这些操作组合起来。例如:arrive()方法表示该线程到达了障碍点,但是该线程并不进入等待状态。arriveAndAwaitAdvance()则表示该线程到达了障碍点,等待与其他线程一起继续运行。arriveAndDeregister()则表示到达障碍点,并离开这个Phaser对象,继续运行。
  3. 结束(termination)。当Phaser对象进入结束状态后,所有的注册线程被释放,继续运行。有三种方法可以使Phaser对象进入结束状态。首先,在默认情况下,当Phaser对象注册的线程数变为0后,该Phaser对象进入结束状态。第二种方法是显示调用forceTermination()方法,强制要求Phaser对象进入结束状态。第三种方法是覆盖onAdvance()方法。当Phaser对象进入一个新的阶段时,即阶段序号自增时,onAdvance()方法会被自动调用。如果此时onAdvance()方法返回true的话,该Phaser对象则进入结束状态。
  4. 查询。Phaser类还提供了一些方法查询Phaser对象信息。例如:getPhase()方法返回当前的阶段序号。getArrivedParties()方法返回当前有多少个注册的线程已达到当前阶段的障碍点。getRegisteredParties()返回当前已注册的线程数。

接下来,我们用一个例子来展示Phaser的用法。在main()函数中创建了一个线程池ExecutorService对象和一个Phaser对象。这个Phaser对象使用数值1来初始化是因为主线程也需要向其注册,但是,又不想通过调用register()注册。

然后在main()方法向线程池提交三个任务,它们分别是"thread-1","thread-2"和"thread-3"。在PhaserExample的构造函数中,该线程调用this.ph.register()向Phaser对象完成注册。

所以,在main()方法和run()方法中,同时运行的四个线程调用arriveAndAwaitAdvance()方法,以等待其他线程到达该障碍点。当四个线程都到达后,四个线程被同时释放,继续向前执行。

import java.util.concurrent.Phaser;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PhaserExample implements Runnable{
    private String name = null;
    private Phaser ph = null;

    public PhaserExample(String name, Phaser ph) {
        this.name = name;
        this.ph = ph;
        this.ph.register();
    }

    @Override
    public void run() {
        //等待其他线程到达障碍点
        ph.arriveAndAwaitAdvance();

        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {}

        System.out.println(name + " leaves the phaser object.");
        // 表明到达障碍点并离开ph对象
        ph.arriveAndDeregister();
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Phaser ph = new Phaser(1); //注册运行main()函数的主线程

        executorService.submit(new PhaserExample("thread-1", ph));
        executorService.submit(new PhaserExample("thread-2", ph));
        executorService.submit(new PhaserExample("thread-3", ph));

        //等待其他线程到达障碍点
        ph.arriveAndAwaitAdvance();
        System.out.println("Phase number: " + ph.getPhase());

        // 等待其他线程到达障碍点
        ph.arriveAndAwaitAdvance();
        System.out.println("Main thread leaves the phaser object.");
        System.out.println("Phase number: " + ph.getPhase());

        // 停止线程池
        executorService.shutdown();
    }
}

程序运行结果:

> java PhaserExample
Phase number: 1
thread-2 leaves the phaser object.
thread-1 leaves the phaser object.
thread-3 leaves the phaser object.
Main thread leaves the phaser object.
Phase number: 2

5 Exchanger类

除了线程同步以外,线程之间交换数据也是多线程程序常见的操作之一。java.util.concurrent.Exchanger类用于设置一个同步点,并帮助线程之间交互数据。这个同步点是与数据交换相关联的,因为,当线程达到这个同步点时,需要准备好交换的数据。Exchanger类只定义了两个版本的exchange()方法。exchange()方法的输入参数是本线程需要交换出去的对象。exchange()返回的对象是从另一个线程交换过来的对象。这两个对象的类型是相同。

exchange()方法的第二个版本可以设置一个最长的等待时间。当本线程调用exchange()方法后,需要等待对端线程也进入交换同步点,并调用exchange()方法完成交换。如果在等待过程中超时的话,该线程会抛出TimeoutException异常。

package java.util.concurrent;

public class Exchanger<V> {
    public V exchange(V v) throws InterruptedException;
    public V exchange(V v, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
}

我们通过下面的一个例子讲解Exchanger类的用法。main()方法首先创建一个Exchanger对象,它负责交换String类型的数据。message1和message2是两个线程即将交换的对象。然后,main()方法启动两个线程。在run()方法中首先打印本线程持有的消息,然后调用exchange()方法与对端线程交换。最后,打印交换过来的消息。

import java.util.concurrent.Exchanger;

public class ExchangerExample implements Runnable{
    private String name = null;
    private String message = null;
    Exchanger<String> exchanger = null;

    public ExchangerExample(String name, String message, Exchanger<String> exchanger) {
        this.name = name;
        this.message = message;
        this.exchanger = exchanger;
    }
    
    @Override
    public void run() {
        System.out.println(String.format("Thread %s is holding message %s.", name, message));

        try {
            //发起与对端线程交换message对象
            this.message = exchanger.exchange(this.message);
        } catch (InterruptedException ex) {}
        
        System.out.println(String.format("Thread %s is holding message %s.", name, message));
    }

    public static void main(String[] args) {
        try {
            Exchanger<String> exchanger = new Exchanger<String>();
            String message1 = "This is message 1";
            String message2 = "This is message 2";
    
            Thread thread1 = new Thread(new ExchangerExample("Thread-1", message1, exchanger));
            Thread thread2 = new Thread(new ExchangerExample("Thread-2", message2, exchanger));
    
            thread1.start();
            thread2.start();
    
            thread1.join();
            thread2.join();
        } catch (InterruptedException ex) {}
    }
}

程序运行结果:

> java ExchangerExample
Thread Thread-2 is holding message This is message 2.
Thread Thread-1 is holding message This is message 1.
Thread Thread-1 is holding message This is message 2.
Thread Thread-2 is holding message This is message 1.

6 结语

本章介绍了Java标准库中提供的四个线程同步的辅助类。与关键字synchronized锁Lock相比,它们更加侧重于线程进度的同步,或者进程运行阶段的同步。从CountDownLatch类,到CyclicBarrier类,最后到Phaser类,类的设计越来越灵活,可应用的场景越来越复杂。第四个类Exchanger为两个进程之间交换数据提供了方便。

类名特点
CountDownLatchCountDownLatch内部包含一个计数器,当计数器变为0时,所有的线程才能继续运行。CountDownLatch对象不能被反复使用。
CyclicBarrierCyclicBarrier设置了一个障碍点,只有当所有的线程到达该障碍点后,才能同时继续运行。CyclicBarrier对象可循环使用。
PhaserPhaser也通过设置障碍点实现线程同步运行。Phaser提供更加灵活的接口,同步线程的个数可动态变化。Phaser对象可循环使用。
Exchanger线程间交换数据
上一章
下一章

注册用户登陆后可留言

Copyright  2019 Little Waterdrop, LLC. All Rights Reserved.