线程安全的生产者消费者四种实现方法

 

问题描述

在IT技术面试过程中,我们经常会遇到生产者消费者问题(Producer-consumer problem), 这是多线程并发协作问题的经典案例。场景中包含三个对象,生产者(Producer),消费者(Consumer)以及一个固定大小的缓冲区(Buffer)。生产者的主要作用是不断生成数据放到缓冲区,消费者则从缓冲区不断消耗数据。该问题的关键是如何线程安全的操作共享数据块,保证生产者线程和消费者线程可以正确的更新数据块,主要考虑 1. 生产者不会在缓冲区满时加入数据. 2. 消费者应当停止在缓冲区时消耗数据. 3. 在同一时间应当只允许一个生产者或者消费者访问共享缓冲区(这一点是对于互斥操作访问共享区块的要求)。

解决方案

解决问题以上问题通常有信号量,wait & notify, 管道或者阻塞队列等几种思路。本文以Java语言为例一一进行举例讲解。

信号量

信号量(Semaphore)也称信号灯,是用来控制资源被同时访问的个数,比如控制访问数据库最大连接数的数量,线程通过acquire()获得连接许可,完成数据操作后,通过release()释放许可。对于生产者消费者问题来说,为了满足线程安全操作的要求,同一时间我们只允许一个线程访问共享数据区,因此需要一个大小为1的信号量mutex来控制互斥操作。注意到我们还定义了notFull 和 notEmpty 信号量,notFull用于标识当前可用区块的空间大小,当notFull size 大于0时表明"not full", producer 可以继续生产,等于0时表示空间已满,无法继续生产;同样,对于notEmpty信号量来说,大于0时表明 "not empty", consumer可以继续消耗,等于0 时表明没有产品,无法继续消耗。notFull初始size 为5 (5个available空间可供生产),notEmpty初始为0(没有产品可供消耗)。

   /***       数据仓储class,所有的producer和consumer共享这个class对象    **/     static class DataWareHouse {        //共享数据区         private final Queue<String> data = new LinkedList();         //非满锁         private final Semaphore notFull;         //非空锁         private final Semaphore notEmpty;         //互斥锁         private final Semaphore mutex;          public DataWareHouse(int capacity) {             this.notFull = new Semaphore(capacity);             this.notEmpty = new Semaphore(0);             mutex = new Semaphore(1);         }         public void offer(String x) throws InterruptedException {             notFull.acquire(); //producer获取信号,notFull信号量减一             mutex.acquire(); //当前进程获得信号,mutex信号量减1,其他线程被阻塞操作共享区块data             data.add(x);             mutex.release(); //mutex信号量+1, 其他线程可以继续信号操作共享区块data             notEmpty.release(); //成功生产数据,notEmpty信号量加1         }         public String poll() throws InterruptedException {             notEmpty.acquire(); //notEmpty信号减一             mutex.acquire();             String result = data.poll();             mutex.release();             notFull.release(); //成功消耗数据, notFull信号量加1             return result;         }     }    /**Producer线程**/     static class Producer implements Runnable {         private final DataWareHouse dataWareHouse;          public Producer(final DataWareHouse dataWareHouse) {             this.dataWareHouse = dataWareHouse;         }          @Override         public void run() {             while (true) {                 try {                     Thread.sleep(100); //生产的速度慢于消耗的速率                     String s = UUID.randomUUID().toString();                     System.out.println("put  data " + s);                     dataWareHouse.offer(s);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         }     }    /**Consumer线程**/     static class Consumer implements Runnable {         private final DataWareHouse dataWareHouse;          public Consumer(final DataWareHouse dataWareHouse) {             this.dataWareHouse = dataWareHouse;         }          @Override