高并发编程学习(2)——线程通信详解
为获得良好的阅读体验,请访问原文: 传送门
前序文章
高并发编程学习(1)——并发基础 - https://www.wmyskxz.com/2019/11/26/gao-bing-fa-bian-cheng-xue-xi-1-bing-fa-ji-chu/
一、经典的生产者消费者案例
上一篇文章我们提到一个应用可以创建多个线程去执行不同的任务,如果这些任务之间有着某种关系,那么线程之间必须能够通信来协调完成工作。
生产者消费者问题(英语:Producer-consumer problem)就是典型的多线程同步案例,它也被称为有限缓冲问题(英语:Bounded-buffer problem)。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。(摘自维基百科:生产者消费者问题)
注意: 生产者-消费者模式中的内存缓存区的主要功能是数据在多线程间的共享,此外,通过该缓冲区,可以缓解生产者和消费者的性能差;
准备基础代码:无通信的生产者消费者
我们来自己编写一个例子:一个生产者,一个消费者,并且让他们让他们使用同一个共享资源,并且我们期望的是生产者生产一条放到共享资源中,消费者就会对应地消费一条。
我们先来模拟一个简单的共享资源对象:
public class ShareResource {
private String name;
private String gender;
/**
* 模拟生产者向共享资源对象中存储数据
*
* @param name
* @param gender
*/
public void push(String name, String gender) {
this.name = name;
this.gender = gender;
}
/**
* 模拟消费者从共享资源中取出数据
*/
public void popup() {
System.out.println(this.name + "-" + this.gender);
}
}
然后来编写我们的生产者,使用循环来交替地向共享资源中添加不同的数据:
public class Producer implements Runnable {
private ShareResource shareResource;
public Producer(ShareResource shareResource) {
this.shareResource = shareResource;
}
@Override
public void run() {
for (int i = 0; i < 50; i++) {
if (i % 2 == 0) {
shareResource.push("凤姐", "女");
} else {
shareResource.push("张三", "男");
}
}
}
}
接着让我们的消费者不停地消费生产者产生的数据:
public class Consumer implements Runnable {
private ShareResource shareResource;
public Consumer(ShareResource shareResource) {
this.shareResource = shareResource;
}
@Override
public void run() {
for (int i = 0; i < 50; i++) {
shareResource.popup();
}
}
}
然后我们写一段测试代码,来看看效果:
public static void main(String[] args) {
// 创建生产者和消费者的共享资源对象
ShareResource shareResource = new ShareResource();
// 启动生产者线程
new Thread(new Producer(shareResource)).start();
// 启动消费者线程
new Thread(new Consumer(shareResource)).start();
}
我们运行发现出现了诡异的现象,所有的生产者都似乎消费到了同一条数据:
张三-男
张三-男
....以下全是张三-男....
为什么会出现这样的情况呢?照理说,我的生产者在交替地向共享资源中生产数据,消费者也应该交替消费才对呀..我们大胆猜测一下,会不会是因为消费者是直接循环了 30 次打印共享资源中的数据,而此时生产者还没有来得及更新共享资源中的数据,消费者就已经连续打印了 30 次了,所以我们让消费者消费的时候以及生产者生产的时候都小睡个 10 ms 来缓解消费太快 or 生产太快带来的影响,也让现象更明显一些:
/**
* 模拟生产者向共享资源对象中存储数据
*
* @param name
* @param gender
*/
public void push(String name, String gender) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
this.name = name;
this.gender = gender;
}
/**
* 模拟消费者从共享资源中取出数据
*/
public void popup() {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
System.out.println(this.name + "-" + this.gender);
}
再次运行代码,发现了出现了以下的几种情况:
重复消费:消费者连续地出现两次相同的消费情况(张三-男/ 张三-男);
性别紊乱:消费者消费到了脏数据(张三-女/ 凤姐-男);
分析出现问题的原因
重复消费:我们先来看看重复消费的问题,当生产者生产出一条数据的时候,消费者正确地消费了一条,但是当消费者再来共享资源中消费的时候,生产者还没有准备好新的一条数据,所以消费者就又消费到老数据了,这其中的根本原因是生产者和消费者的速率不一致。
性别紊乱:再来分析第二种情况。不同于上面的情况,消费者在消费第二条数据时,生产者也正在生产新的数据,但是尴尬的是,生产者只生产了一半儿(也就是该执行完 this.name = name),也就是还没有来得及给 gender 赋值就被消费者给取走消费了.. 造成这样情况的根本原因是没有保证生产者生产数据的原子性。
解决出现的问题
加锁解决性别紊乱
我们先来解决性别紊乱,也就是原子性的问题吧,上一篇文章里我们也提到了,对于这样的原子性操作,解决方法也很简单:加锁。稍微改造一下就好了:
/**
* 模拟生产者向共享资源对象中存储数据
*
* @param name
* @param gender
*/
synchronized public void push(String name, String gender) {
this.name = name;
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
this.gender = gender;
}
/**
* 模拟消费者从共享资源中取出数据
*/
synchronized public void popup() {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
System.out.println(this.name + "-" + this.gender);
}
我们在方法前面都加上了 synchronized 关键字,来保证每一次读取和修改都只能是一个线程,这是因为当 synchronized 修饰在普通同步方法上时,它会自动锁住当前实例对象,也就是说这样改造之后读/ 写操作同时只能进行其一;
我把 push 方法小睡的代码改在了赋值 name 和 gender 的中间,以强化验证原子性操作是否成功,因为如果不是原子性的话,就很可能出现赋值 name 还没赋值给 gender 就被取走的情况,小睡一会儿是为了加强这种情况的出现概率(可以试着把 synchronized 去掉看看效果);
运行代码后发现,并没有出现性别紊乱的现象了,但是重复消费仍然存在。
等待唤醒机制解决重复消费
我们期望的是 张三-男 和 凤姐-女 交替出现,而不是有重复消费的情况,所以我们的生产者和消费者之间需要一点沟通,最容易想到的解决方法是,我们新增加一个标志位,然后在消费者中使用 while 循环判断,不满足条件则不消费,条件满足则退出 while 循环,从而完成消费者的工作。
while (value != desire) {
Thread.sleep(10);
}
doSomething();
这样做的目的就是为了防止「过快的无效尝试」,这种方法看似能够实现所需的功能,但是却存在如下的问题:
1)难以确保及时性。在睡眠时,基本不消耗处理器的资源,但是如果睡得过久,就不能及时发现条件已经变化,也就是及时性难以保证;
2)难以降低开销。如果降低睡眠的时间,比如休眠 1 毫秒,这样消费者能够更加迅速地发现条件变化,但是却可能消耗更多的处理资源,造成了无端的浪费。
以上两个问题吗,看似矛盾难以调和,但是 Java 通过内置的等待/ 通知机制能够很好地解决这个矛盾并实现所需的功能。
等待/ 通知机制,是指一个线程 A 调用了对象 O 的 wait() 方法进入等待状态,而另一个线程 B 调用了对象 O 的 notifyAll() 方法,线程 A 收到通知后从对象 O 的 wait() 方法返回,进而执行后续操作。上述两个线程都是通过对象 O 来完成交互的,而对象上的 wait 和 notify/ notifyAll 的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
这里有一个比较奇怪的点是,为什么看起来像是线程之间操作的 wait 和 notify/ notifyAll 方法会是 Object 类中的方法,而不是 Thread 类中的方法呢?
简单来说:因为 synchronized 中的这把锁可以是任意对象,因为要满足任意对象都能够调用,所以属于 Object 类;
专业点说:因为这些方法在操作同步线程时,都必须要标识它们操作线程的锁,只有同一个锁上的被等待线程,可以被同一个锁上的 notify 唤醒,不可以对不同锁中的线程进行唤醒。也就是说,等待和唤醒必须是同一个锁。而锁可以是任意对象,所以可以被任意对象调用的方法是定义在 Object 类中。
好,简单介绍完等待/ 通知机制,我们开始改造吧:
public class ShareResource {
private String name;
private String gender;
// 新增加一个标志位,表示共享资源是否为空,默认为 true
private boolean isEmpty = true;
/**
* 模拟生产者向共享资源对象中存储数据
*
* @param name
* @param gender
*/
synchronized public void push(String name, String gender) {
try {
while (!isEmpty) {
// 当前共享资源不为空的时,则等待消费者来消费
// 使用同步锁对象来调用,表示当前线程释放同步锁,进入等待池,只能被其他线程所唤醒
this.wait();
}
// 开始生产
this.name = name;
Thread.sleep(10);
this.gender = gender;
// 生产结束
isEmpty = false;
// 生产结束唤醒一个消费者来消费
this.notify();
} catch (Exception ignored) {
}
}
/**
* 模拟消费者从共享资源中取出数据
*/
synchronized public void popup() {
try {
while (isEmpty) {
// 为空则等着生产者进行生产
// 使用同步锁对象来调用,表示当前线程释放同步锁,进入等待池,只能被其他线程所唤醒
this.wait();
}
// 消费开始
Thread.sleep(10);
System.out.println(this.name + "-" + this.gender);
// 消费结束
isEmpty = true;
// 消费结束唤醒一个生产者去生产
this.notify();
} catch (InterruptedException ignored) {
}
}
}
我们期望生产者生产一条,然后就去通知消费者消费一条,那么在生产和消费之前,都需要考虑当前是否需要生产 or 消费,所以我们新增了一个标志位来判断,如果不满足则等待;
被通知后仍然要检查条件,条件满足,则执行我们相应的生产 or 消费的逻辑,然后改变条件(这里是 isEmpty),并且通知所有等待在对象上的线程;
注意:上面的代码中通知使用的 notify() 方法,这是因为例子中写死了只有一个消费者和生产者,在实际情况中建议还是使用 notifyAll() 方法,这样多个消费和生产者逻辑也能够保证(可以自己试一下);
小结
通过初始版本一步步地分析问题和解决问题,我们就差不多写出了我们经典生产者消费者的经典代码,但通常消费和生产的逻辑是写在各自的消费者和生产者代码里的,这里我为了方便阅读,把他们都抽离到了共享资源上,我们可以简单地再来回顾一下这个消费生产和等待通知的整个过程:
以上就是关于生产者生产一条数据,消费者消费一次的过程了,涉及的一些具体细节我们下面来说。
二、线程间的通信方式
等待唤醒机制的替代:Lock 和 Condition
我们从上面的中看到了 wait() 和 notify() 方法,只能被同步监听锁对象来调用,否则就会报出 IllegalMonitorZStateException 的异常,那么现在问题来了,我们在上一篇提到的 Lock 机制根本就没有同步锁了,也就是没有自动获取锁和自动释放锁的概念,因为没有同步锁,也就意味着 Lock 机制不能调用 wait 和 notify 方法,我们怎么办呢?
好在 Java 5 中提供了 Lock 机制的同时也提供了用于 Lock 机制控制通信的 Condition 接口,如果大家理解了上面说到的 Object.wait() 和 Object.notify() 方法的话,那么就能很容易地理解 Condition 对象了。
它和 wait() 和 notify() 方法的作用是大致相同的,只不过后者是配合 synchronized 关键字使用的,而 Condition 是与重入锁相关联的。通过 Lock 接口(重入锁就实现了这一接口)的 newCondition() 方法可以生成一个与当前重入锁绑定的 Condition 实例。利用 Condition 对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。
我们拿上面的生产者消费者来举例,修改成 Lock 和 Condition 代码如下:
public class ShareResource {
private String name;
private String gender;
// 新增加一个标志位,表示共享资源是否为空,默认为 true
private boolean isEmpty = true;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
/**
* 模拟生产者向共享资源对象中存储数据
*
* @param name
* @param gender
*/
public void push(String name, String gender) {
lock.lock();
try {
while (!isEmpty) {
// 当前共享资源不为空的时,则等待消费者来消费
condition.await();
}
// 开始生产
this.name = name;
Thread.sleep(10);
this.gender = gender;
// 生产结束
isEmpty = false;
// 生产结束唤醒消费者来消费
condition.signalAll();
} catch (Exception ignored) {
} finally {
lock.unlock();
}
}
/**
* 模拟消费者从共享资源中取出数据
*/
public void popup() {
lock.lock();
try {
while (isEmpty) {
// 为空则等着生产者进行生产
condition.await();
}
// 消费开始
Thread.sleep(10);
System.out.println(this.name + "-" + this.gender);
// 消费结束
isEmpty = true;
// 消费结束唤醒生产者去生产
condition.signalAll();
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
}
}
在 JDK 内部,重入锁和 Condition 对象被广泛地使用,以 ArrayBlockingQueue 为例,它的 put() 方法实现如下:
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
// 构造函数,初始化锁以及对应的 Condition 对象
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 等待队列有足够的空间
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 通知需要 take() 的线程,队列已有数据
notEmpty.signal();
}
同理,对应的 take() 方法实现如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 如果队列为空,则消费者队列要等待一个非空的信号
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
允许多个线程同时访问:信号量(Semaphore)
以下内容摘录 or 改编自 《实战 Java 高并发程序设计》 3.1.3 节的内容
信号量为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展,无论是内部锁 synchronized 还是重入锁 ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。信号量主要提供了以下构造函数:
public Semaphore(int permits)
public Semaphore(int permits, boolean fair) // 第二个参数可以指定是否公平
在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑如下:
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()
acquire() 方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。
acquireUninterruptibly() 方法和 acquire() 方法类似,但是不响应中断。
tryAcquire() 尝试获得一个许可,如果成功则返回 true,失败则返回 false,它不会进行等待,立即返回。
release() 用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
在 JDK 的官方 Javadoc 中,就有一个有关信号量使用的简单实例,有兴趣的读者可以自行去翻阅一下,这里给出一个更傻瓜化的例子:
public class SemapDemo implements Runnable {
final Semaphore semaphore = new Semaphore(5);
@Override
public void run() {
try {
semaphore.acquire();
// 模拟耗时操作
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + ":done!");
semaphore.release();
} catch (InterruptedException ignore) {
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
final SemapDemo demo = new SemapDemo();
for (int i = 0; i < 20; i++) {
executorService.submit(demo);
}
}
}
执行程序,就会发现系统以 5 个线程为单位,依次输出带有线程 ID 的提示文本。
在实现上,Semaphore 借助了线程同步框架 AQS(AbstractQueuedSynchornizer),同样借助了 AQS 来实现的是 Java 中可重入锁的实现。AQS 的强大之处在于,你仅仅需要继承它,然后使用它提供的 api 就可以实现任意复杂的线程同步方案,AQS 为我们做了大部分的同步工作,所以这里不细说,之后再来详细探究一下...
我等着你:Thread.join()
如果一个线程 A 执行了 thread.join() 方法,其含义是:当前线程 A 等待 thread 线程终止之后才从 thread.join() 返回。线程 Thread 除了提供 join() 方法之外,还提供了 join(long millis) 和 join(long millis, int nanos) 两个具备超时特性的方法。这两个超时方法表示,如果线程 Thread 在给定的超时时间里没有终止,那么将会从该超时方法中返回。
在下面的代码中,我们创建了 10 个线程,编号 0 ~ 9,每个线程调用前一个线程的 join() 方法,也就是线程 0 结束了,线程 1 才能从 join() 方法中返回,而线程 0 需要等待 main 线程结束。
public class Join {
public static void main(String[] args) throws InterruptedException {
Thread previous = Thread.currentThread();
for (int i = 0; i < 10; i++) {
// 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
Thread thread = new Thread(new Domino(previous), String.valueOf(i));
thread.start();
previous = thread;
}
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " terminate. ");
}
static class Domino implements Runnable {
private Thread thread;
public Domino(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
try {
thread.join();
} catch (InterruptedException ignore) {
}
System.out.println(Thread.currentThread().getName() + " terminate. ");
}