03_concurrency_06_atomic_object

第四十章 原子类与并发容器

1 概述

在Java标准库java.util.concurrent.atomic中,提供了几种原子对象类,例如:AtomicBoolean,AtomicInteger和AtomicLong。这些原子类的主要目的是为了提供原子读,原子写,和原子compareAndSet()方法/操作。

Java标准库还在java.util.concurrent包中提供了并发容器,例如:BlockingQueue,ConcurrentMap等。这些类提供了线程安全(Thread-Safe)的方法,供开发人员在并发环境下使用。

本章着重介绍原子类和并发容器的使用方法和原理。

2 原子类

因为AtomicBoolean, AtomicInteger和AtomicLong使用方法类似,我们将以AtomicLong为例,介绍这些原子类的使用方法。

2.1 原子Getter和Setter方法

AtomicLong类提供原子get()和set()方法。AtomicLong类内部其实是用一个原始数据类型long来保存整型数值的。因为对long类型数据的读和写均不是原子操作,所以,标准库使用了volatile关键字。因此,Java虚拟机必须使用原子操作来读写volative long类型的数据。

package java.util.concurrent.atomic;
public class AtomicLong {
    private volatile long value;
    
    public long get();
    public void set(long newValue);
    ...
}

2.2 "compareAndSet"类操作

一般来讲,compare(比较)和set(设置)是两个操作,所以,compareAndSet不是原子操作。但是,在一些场景下,compareAndSet需要作为原子操作执行。因此,AtomicLong和其他原子类为此提供了一组原子的"compareAndSet"方法。小水滴称这些方法为"compareAndSet"类方法。这些方法都可以看作是原子操作。在AtomicLong类中,这些方法包括:

package java.util.concurrent.atomic;
public class AtomicLong {
    public long accumulateAndGet(long x, LongBinaryOperator accumulatorFunction);
    public long addAndGet(long delta);
    public boolean compareAndSet(long expect, long update);
    public long decrementAndGet();
    public long getAndAccumulate(long x, LongBinaryOperator accumulatorFunction);
    public long getAndAdd(long delta);
    public long getAndDecrement();
    public long getAndIncrement();
    public long getAndSet(long newValue);
    public long getAndUpdate(LongUnaryOperator updateFunction);
    public long incrementAndGet();
    public long updateAndGet(LongUnaryOperator updateFunction);
}

上述原子方法都是由Compare And Swap (CAS)或者类似技术实现的。在我们前一章讲到的synchronized关键字,或者使用锁(Lock)来保护资源的方法,被称为是一种悲观策略(Pessimistic Policy),有时也被称为使用“悲观锁(Pessimistic Lock)”。悲观策略的意义在于,每当进入竞态条件时,假设最糟的情况会发生,因此,在进入竞态条件之前,我们需要通过锁机制来保护竞态资源。正是因为这种悲观的想法,这种策略被称为悲观策略。而Compare And Swap是一种乐观策略(Optimistic Policy)。这种策略大胆的假设,即使进入竞态条件,所有的操作都会正确的完成。所以,在进入竞态条件时,不做任何的保护。但是,在操作完成之后,需要检测在操作过程中,是否因为竞态条件而得到错误的结果。如果结果错误的话,需要重新执行一次之前的操作。因为乐观策略不需要加锁/解锁操作,所以,它的运行速度很快,而且竞态条件不一定每次都发生,因此,乐观策略或者Compare And Swap被广泛的采用于底层逻辑的开发。

因为Compare And Swap是原子操作,上述的方法都可以转换成不同方式的Compare And Swap操作。有兴趣的读者可以参考Java标准库的源代码。

2.3 为什么标准库没有提供AtomicShort和AtomicDouble?

在Java 13版本中,标准库并不包含AtomicByte,AtomicChar,AtomicShort,AtomicFloat和AtomicDouble类,这是为什么呢?

Java标准文档解释说这些原子类并不能完全替代Integer类或者其他数据类,因为这些原子类并没有equals()或者compareTo()这些基本方法。另一个原因是Java标准库只提供了常用的几种原子数据类。如果需要使用AtomicByte,AtomicChar或者AtomicShort的话,可以用AtomicInteger代替,然后将int强制转换成所需的类型。如果需要使用AtomicFloat和AtomicDouble的话,可以使用AtomicInteger或者AtomicLong代替,然后再使用Float.floatToRawIntBits(float)和Float.intBitsToFloat(int),或者Double.doubleToRawLongBits(double)和 Double.longBitsToDouble(long)进行数据转换。以下是Java标准库文档给出的说明。

Atomic classes are not general purpose replacements for java.lang.Integer and related classes. They do not define methods such as equals, hashCode and compareTo... Additionally, classes are provided only for those types that are commonly useful in intended applications. For example, there is no atomic class for representing byte. In those infrequent cases where you would like to do so, you can use an AtomicInteger to hold byte values, and cast appropriately. You can also hold floats using Float.floatToRawIntBits(float) and Float.intBitsToFloat(int) conversions, and doubles using Double.doubleToRawLongBits(double) and Double.longBitsToDouble(long) conversions. -- Java SE 8 标准库

3 并发容器

Java并发库还提供了并发容器,为开发人员提供线程安全(Thread-Safe)的容器操作。

3.1 BlockingQueue类

BlockingQueue定义在Java标准库的java.util.concurrent包中。BlockingQueue是一个线程安全(Thread-Safe)队列。多个线程可以同时向队列添加元素,或者从队列中提取元素。“Blocking”的意思是,当一个线程使用take()方法从队列提取元素时,而此时该队列为空的话,这个线程会被提取操作阻塞(Blocked)。另一方面,如果BlockingQueue有最多元素个数限制的话,当一个线程使用put()方法向队列添加元素时,而此时该队列已满的话,这个线程会被添加操作阻塞(Blocked)。

BlockingQueue还提供了以非阻塞的方式添加/提取元素。我们将这些方法总结如下:

添加元素说明
boolean add(E e);添加新元素至队尾。若队已满,抛出IllegalStateException异常。
boolean offer(E e);添加新元素至队尾。若队已满,返回false。
void put(E e);添加新元素至队尾。若队已满,等待下一个空位。
提取元素说明
E remove()返回并移除队首元素。若队列为空,抛出NoSuchElementException异常。
E poll()返回并移除队首元素。若队列为空,等待下一个元素或者超时。
E take()返回并移除队首元素。若队列为空,等待下一个元素。

BlockingQueue非常适合于开发多线程的生产者/消费者模型。如下面的代码所示,Producer类对象和Consumer类对象共享一个BlockingQueue对象。Producer线程向队列添加新对象;而Consumer线程从队列提取元素。BlockingQueue会调整Producer和Consumer运行的速度。当队列为空时,阻塞Consumer线程。当队列已满时,阻塞Producer线程。因为BlockingQueue是线程安全的,所以,开发人员无需另外实现线程同步,以保护共享的队列。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Producer implements Runnable {
    private BlockingQueue<Object> queue = null;

    public Producer(BlockingQueue<Object> theQueue) {
        this.queue = theQueue;
    }

    public void run() {
        try {
            while (true) {
                Object obj = new Object();
                queue.put(obj); // 向队列添加新的对象
                System.out.println("Producing an object.");
            }
        } catch (Exception ex) {}
    }
}

public class Consumer implements Runnable {
    private BlockingQueue<Object> queue = null;

    public Consumer(BlockingQueue<Object> theQueue) {
        this.queue = theQueue;
    }

    public void run() {
        try {
            while (true) {
                Object obj = queue.take(); // 从队列提取对象
                System.out.println("Consuming an object.");
            }
        } catch (Exception ex) {}
    }
}

public class BlockingQueueExample {
    public static void main(String[] args) {
        // Producer线程和Consumer线程共享一个BlockingQueue对象
        BlockingQueue<Object> myQueue = new LinkedBlockingQueue<>(10);

        Thread producerThread = new Thread(new Producer(myQueue));
        Thread consumerThread = new Thread(new Consumer(myQueue));
        producerThread.start();
        consumerThread.start();
    }
}

3.2 ConcurrentMap和ConcurrentHashMap

Map是最为常见的一种容器,然而,常用的java.util.HashTable或者java.util.HashMap都不提供线程安全(Thread-Safe)的操作。为了解决这个问题,Java标准库新增了java.util.concurrent.ConcurrentMap接口。所有实现这个接口的类(例如:java.util.concurrent.ConcurrentHashMap)都支持线程安全的操作。

ConcurrentHashMap类实现了ConcurrentMap接口。ConcurrentHashMap将整个Map切分为多个分区。ConcurrentHashMap允许多个线程非阻塞的同时读取多个元素。这些操作的并行程度取决于将整个Map切分出的分区数。ConcurrentHashMap还大量使用了Compare And Swap技术,所以,更新操作(例如:put(),remove(),clear())不需要获得锁。因此,ConcurrentHashMap读写的效率很高,而且相互不影响。

下面的例子展示了两个线程同时读和写一个Map的例子。Writer线程和Reader线程共享一个ConcurrentMap对象。Writer线程向map对象中写入<i, "A"_i>元组,而Reader线程则从map对象中删除元组,并打印出其key和value的值。两个线程在map对象上的操作都是安全的,因为ConcurrentMap类提供的是线程安全的操作。

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;

public class Writer implements Runnable {
    private ConcurrentMap<Integer, String> sharedMap = null;

    public Writer(ConcurrentMap<Integer, String> theMap) {
        this.sharedMap = theMap;
    }

	@Override
	public void run() {
        try {
            // Writer向Map中写入对象
		    for(int i= 1; i<=10; i++) {
                this.sharedMap.putIfAbsent(i, "A"+ i);
                System.out.println("Write Key: " + i + " Value: " +  "A"+ i);
                Thread.sleep(100);
		    }
        } catch (InterruptedException ex) {}
	}
}

public class Reader implements Runnable {
    private ConcurrentMap<Integer, String> sharedMap = null;

    public Reader(ConcurrentMap<Integer, String> theMap) {
        this.sharedMap = theMap;
    }
    
	@Override
	public void run() {
        try {
            // Reader从Map中取出对象
	        for(int i= 1; i<=10;) {
                String value = this.sharedMap.remove(i);
                if (value != null) {
                    System.out.println("Read Key: " + i + " Value: " + value);
                    i++;
                }
                Thread.sleep(100);
		    }	
        } catch (InterruptedException ex) {}
	}
}

public class ConcurrentMapExample {
    public static void main(String[] args) {
        // 创建Reader和Writer线程,它们共享一个ConcurrentMap对象
        ConcurrentMap<Integer, String> map = new ConcurrentHashMap<Integer,String>();
        Thread readerThread = new Thread(new Reader(map));
        Thread writerThread = new Thread(new Writer(map));

        writerThread.start();
        readerThread.start();

        try {
            writerThread.join();
            readerThread.join();
        } catch (InterruptedException ex) {}
    }
}

程序运行结果如下:

> java ConcurrentMapExample
Write Key: 1 Value: A1
Read Key: 1 Value: A1
Write Key: 2 Value: A2
Read Key: 2 Value: A2
Write Key: 3 Value: A3
Read Key: 3 Value: A3
Write Key: 4 Value: A4
Read Key: 4 Value: A4
Write Key: 5 Value: A5
Read Key: 5 Value: A5
Write Key: 6 Value: A6
Read Key: 6 Value: A6
Write Key: 7 Value: A7
Read Key: 7 Value: A7
Write Key: 8 Value: A8
Read Key: 8 Value: A8
Write Key: 9 Value: A9
Read Key: 9 Value: A9
Write Key: 10 Value: A10
Read Key: 10 Value: A10

4 总结

本章节介绍了Java标准库中的原子对象类和并发容器。它们是Java标准库为开发人员准备的、最基本的工具。因为它们提供了线程安全的操作,在并发环境下,开发人员能够方便的使用它们,无需担心竞态问题。在接下来的章节中,我们还会进一步介绍Java标准库提供的线程同步工具。

 

上一章
下一章

注册用户登陆后可留言

Copyright  2019 Little Waterdrop, LLC. All Rights Reserved.