并发编程学习笔记之构建自定义的同步工具(十一)

概述: 在并发编程学习笔记之并发工具类(四)中,为大家介绍了几种同步工具(同步工具就是依靠自己的状态,调节线程是阻塞还是运行用的.),闭锁、FutureTask、信号量、关卡. 使用以上的同步工具大部分时候可以满足我们的需求,但是如果没能满足我们需要的功能,可以使用语言和类库提供的底层机制,包括内部队列、限制的Condition对象和abstractQueueSynchronizer框架,来构建属于自己的Synchronizer. 1.管理状态依赖性 状态依赖: 若一个操作存在基于状态的先验条件,则把它称为是状态依赖的(state-dependent). 对单线程的程序而言,操纵一个集合,如果集合中的元素不为空,就取出一个元素,如果这个先验条件(集合不为空)不满足,就不需要等了,失败就行了. 但是在多线程的条件下,一个线程走到了检验这个集合是否为空的时候,可以阻塞一段时间,等待结果为真,因为此时可能有另外一个线程往里面添加元素. 对于并发对象,依赖于状态的方法有时可以在不能满足先验条件的情况下选择失败,不过更好的选择是等待先验条件为真. 等待先验条件为真的实现方式 一个可阻塞的状态依赖活动: void blockingAction(){ /*获得锁*/ while(/*先验条件,例如判断队列不为空*/){ /*如果不满足先验条件,释放锁*/ /*等待先验条件为真*/ /*如果线程被中断或者等待超时,选择失败*/ /*重新获得锁*/ } /*执行任务*/ /*再次获得锁*/ } 锁是在操作过程中被释放与重获的,这也让这种加锁的模式略显与众不同. 组成先验条件的状态变量必须由对象的锁保护起来,这样它们能在测试先验条件的过程中保持不变. 如果先验条件尚未满足,就必须释放锁,让其他线程可以修改对象的状态.否则,先验条件就永远无发成真了. 再一次测试先验条件之前,必须要重新获得锁. 接下来以一个BaseBoundedBuffer为基类,用不同的子类去继承它,看一下几种处理先验条件的方式. 基类BaseBoundedBuffer: public class BaseBoundedBuffer { //缓存的数组 private final V[] buf; private int tail; private int head; private int count; /* * 通过构造函数,设置缓存数据的长度 * */ protected BaseBoundedBuffer(int capacity) { this.buf = (V[]) new Object[capacity]; } /* * 线程安全的put方法,同时因为是final的不能被子类重写 * */ protected synchronized final void doPut(V v){ //将v 放到缓存的 tail位置,从0开始赋值 buf[tail] = v; // 自增tail,同时与缓存的长度相比较,如果等于缓存的长度 // 把tail置为0,也就是说下次会从0开始覆盖. if(++tail == buf.length){ tail = 0; } // 统计的数组容量+1 ++count; } /* * 同上 * */ protected synchronized final V doTake(){ /*从head拿取数据*/ V v = buf[head]; /*拿出去的数据置为0*/ buf[head] = null; if (++head == buf.length){ head = 0; } -- count; return v; } //判断是否满了,注意前验条件必须被锁保护,保证不会看到过期数据 public synchronized final boolean isFull(){ //如果当前的容量count == 缓存的长度,那就是满了,返回true return count == buf.length; } //判断是否为空 public synchronized final boolean isEmpty(){ return count == 0 ; } } 1.1 处理方式1: 将先验条件失败传给调用者 子类的实现方式,如果不满足先验条件就抛异常: public class GrumpyBoundedBuffer extends BaseBoundedBuffer { protected GrumpyBoundedBuffer(int capacity) { super(capacity); } /* * 添加元素,如果缓存满了抛异常 * */ public synchronized void put(V v) throws BufferFullException { if(isFull()){ throw new BufferFullException(); } doPut(v); } /* * 取出元素,如果集合是空的抛异常 * */ public synchronized V take() throws BufferEmptyException { if(isEmpty()){ throw new BufferEmptyException(); } return (V) doTake(); } } 这种方式虽然简单,但是使用起来很麻烦.需要时刻捕获异常.而且还需要调用者重新调用这个方法,重新尝试put/take. 在客户端调用take方法: public static void main(String [] args) throws InterruptedException { GrumpyBoundedBuffer grumpyBoundedBuffer = new GrumpyBoundedBuffer(); //循环调用 while(true){ try{ //获得对象v,如果成功break,跳出循环 //如果失败,捕获异常休息1秒 Object v = grumpyBoundedBuffer.take(); break; } catch (BufferEmptyException e){ //抛异常以后,休眠一段时间.再尝试 Thread.sleep(1000); } } } 上面的代码,在调用失败的去情况下会选择休眠一段时间,然后重新尝试.也可以选择不休眠的方式---被称为忙等待或自旋等待. 两种方式各有利弊: 休眠: 可以避免消耗过多的CPU时间,但是容易睡过头,引发响应慢的问题. 自旋等待: 短时间比较适合用这种方式,但是长时间会浪费系统的资源. 所以,客户端代码身处于自旋产生的低CPU使用率和休眠产生的弱响应性之间的两难境地. 有一种折中的方式是使用Thread.yield方法.让当前线程让出一定的时间给其他线程运行. 1.2 处理方式2:利用"轮询加休眠"实现拙劣的阻塞 更好点的方式,"轮询加休眠": public class SleepyBoundedBuffer extends BaseBoundedBuffer { protected SleepyBoundedBuffer(int capacity) { super(capacity); } public void put(V v) throws InterruptedException { //无限尝试将v添加入集合 while(true){ //获得锁 synchronized (this){ //如果不空,就添加进集合,退出循环 if(!isFull()){ doPut(v); return; } } //否则释放锁,休眠一段时间,给其他线程一些修改的机会. Thread.sleep(1000); } } public V take() throws InterruptedException { while(true){ synchronized (this){ if(!isEmpty()){ return doTake(); } } Thread.sleep(1000); } } } 使用这种方式,调用者不必像之前那样处理失败和重试. 选择休眠的时间间隔,是在响应性与CPU使用率之间作出的权衡; 休眠的间隔越小,响应性越好,但是CPU的消耗也越高. 休眠间隔是如何影响响应性的: image 缓存空间变为可用的时刻与线程被唤醒并在此检查的时刻之间可能有延迟. 这就是使用这种方式的弊端,现在有一种更好的方式,条件队列(condition queue). 条件队列可以让线程挂起,并且能够保证当某个条件成为真时,线程可以及时地苏醒过来. 1.3 让条件队列来解决一切 条件队列可以让一组线程--称作等待集--以某种方式等待相关条件变成真.它也由此得名. 不同于传统的队列,条件队列里面放的是等待相关条件的线程. 存放的是线程-画重点 就像每个Java对象都能当做锁一样,每个对象也能当做条件队列.Object的wait、notify、notifyAll方法构成了内部条件队列的API. 一个对象的内部锁与它的内部条件队列是相关的: 为了能够调用对象X中的任一个条件队列方法,必须持有对象X的锁(也就是说在synchronized块中调用wait啊,notify啊,notifyAll啊这些方法,不在锁里调用会报错). 这是因为"等待基于状态的条件"机制必须和"维护状态一致性"机制紧密地绑定在一起:除非你能检查状态,否则你不能等待条件(这里后面,看代码就明白了,说的就是有if/while的条件判断后面才能跟上wait方法),同时,除非你能改变状态,否则你不能从等待(队列)中释放其他的线程(这里说的就是,改变了先验条件的状态,才能调用notify或notifyAll方法); 先简单的介绍一下wait和notify、notifyAll方法的使用: wait、notify和notifyAll都是Object类的方法.也就是说每个类都有这三个方法. 使用这三个方法的时候,一定要在被锁保护的代码块中,否则会报java.lang.IllegalMonitorStateException. wait和notify、notifyAll是配合使用的. 调用wait方法会挂起线程,释放锁,给其它线程一些机会,让前验条件变为真;notify/notifyAll会唤醒挂起的线程,获取锁,继续执行. 使用条件队列实现的方式: public class BoundedBuffer extends BaseBoundedBuffer { protected BoundedBuffer(int capacity) { super(capacity); } /* * 注意这里的synchronized, * 是必须的否则运行会报java.lang.IllegalMonitorStateException * 作用是检查前验条件时保护状态的一致性.不会读到过期数据 * */ public synchronized void put(V v) throws InterruptedException { /*注意这里这里是while循环 * 不是单单一个简单的if,这么做有两个理由 *1. 因为从notify/notifyAll通知的这段时间 * 很有可能前验条件条件又由真变为假.所以循环判断一次是有必要的 * 2. notify/notifyAll的区别,notify是选取一个条件队列中的线程通知, * 而notifyAll则是通知所有的条件队列,当有多个前验条件时,可能有一些没有通过前验条件的也会被通知 * 所以需要再次判断 * */ while(isFull()){ /*挂起当前线程,释放锁,给其他线程一些机会 * 使前验条件为真 * */ wait(); } /*存入数据*/ doPut(v); /*通知,告诉下面的take方法里面已经有数据了*/ notifyAll(); } public synchronized V take() throws InterruptedException { while (isEmpty()){ wait(); } V v = doTake(); notifyAll(); return v; } } 注解说的很详细了,注意两个方法都是被锁保护的,还有使用while循环而不是用if的理由. 这与之前的"轮询加休眠"方式相比更高效,响应性更佳(不会"睡过头"). wait方法也有限时的版本,为了避免死锁的问题,可以使用限时版本的wait. 2.使用条件队列 使用Java提供的类,比你自己去创建一个类要好,因为它经历了重重考验,证明了自己,而且考虑到方便性、简单性你也应该这么做. 但是有时候类库没有提供我们需要的同步工具,所以我们必须使用条件队列自己构建一个同步工具,这时一定要小心谨慎,因为它很容易被用错. 接下来看看使用条件队列的一些注意事项. 2.1 条件谓词 正确使用条件队列的关键在于识别出对象可以等待的条件谓词. 条件谓词是先验条件的第一站,它在一个操作与状态之间建立起依赖关系. 举个例子,什么是条件谓词: 在有限缓存中,只有缓存不为空时take才能执行,否则它必须等待.就take而言,它的条件谓词是"缓存不空", 类似的,put的条件谓词是"缓存不满". 条件谓词是由类的状态变量构成的表达式. 看看之前的代码: //判断是否满了 public synchronized final boolean isFull(){ //如果当前的容量count == 缓存的长度,那就是满了,返回true return count == buf.length; } //判断是否为空 public synchronized final boolean isEmpty(){ return count == 0 ; } count == buf.length; 和 count == 0 ; 就是两个条件谓词 将条件谓词和与之关联的条件队列,以及在条件队列中等待的操作,都写入文档. 在涉及了加锁、wait方法和条件谓词的条件等待中,存在着一种非常重要的三元关系.条件谓词涉及状态变量,而状态变量是由锁保护的,所以在测试条件谓词之前,我们必须先持有锁.锁对象与条件队列对象(wait和notify方法调用的对象)必须也是同一个对象. 每次调用wait都会隐式地与特定的条件谓词相关联. 当调用特定条件谓词的wait时,调用者必须已经持有了与条件队列相关的锁,这个锁必须同时还保护着组成条件谓词的状态变量. 2.2 过早地唤醒 注意,wait的返回并不一定意味着线程正在等待的条件谓词已经变成真了. 一个单独的内部条件队列可以与多个条件谓词共同使用.当有人调用notifyAll,从而唤醒了你的线程时,并不意味着你正在等待的条件谓词现在变成真了.wait甚至可以"假装"返回--不作为对任何线程调用notify的响应.(这就好比烤面包机的线路连接有问题,导致面包尚未烤好,铃声就自己响起来了). 当控制流重新进入调用wait的代码时,它会重新请求与条件队列相关联的锁.但是这时条件谓词不一定为真,有两种可能: 在notify/notifyAll通知的这段时间很有可能条件谓词又由真变为假. notify是选取一个条件队列中的线程通知,而notifyAll则是通知所有的条件队列,所以被notifyAll通知的wait有可能前验条件不为真. 所以调用wait的地方,要是用while(前验条件)进行循环判断. 当使用条件等待时(Object.wait或者Condition.await): 永远设置一个条件谓词---一些对状态的测试,线程执行前必须满足它; 永远在调用wait前测试条件谓词,并且从wait中返回后再次测试; 永远在循环中调用wait; 确保构成条件谓词的状态变量被锁保护,而这个锁正是与条件队列相关联的; 当调用wait、notify或者notifyAll时,要持有与条件队列相关联的锁;并且, 在检查条件谓词之后、开始执行被保护的逻辑之前,不要释放锁. 2.3 丢失的信号 死锁和活锁是活跃度失败的一种形式,另一种活跃度失败的形式是丢失的信号(missed signal). 当一个线程等待的特定条件为真,但是进入等待前检查条件谓词却返回了假,我们称这样就出现了一个丢失的信号. 线程在等待一个已经通知过的消息,它有可能永远等不到这个消息. 例如: 未能在调用wait之前先检测条件谓词,就会导致信号的丢失. 但是使用while()循环的方式,可以避免这种情况的发生. 2.4 通知(notify) 无论何时,当你在等待一个条件,一定要确保有人会在条件谓词变为真时通知你. 在条件队列API中有两个方法--notify和notifyAll.无论调用哪一个,你都必须持有与条件队列对象相关联的锁. 调用notify的结果是:JVM会从在这个条件队列中等待的众多线程中挑选一个,并把它唤醒; 调用notifyAll会唤醒所有正在这个条件队列中等待的线程. notify/notifyAll应该尽快释放锁,以确保在wait处阻塞的线程尽可能快的解除阻塞. 由于会有多个线程因为不同的原因在同一个条件队列中等待,因此不用notifyAll而使用notify是危险的.这主要是因为单一的通知容易导致同类的线程丢失全部信号. notifyAll在大多数情况下都是由于notify的选择. 举个例子: 假设线程A因为谓词PA而在条件队列中等待,同时线程B因为谓词PB也在同一个条件队列中等待. 现在假设PB变成真,线程C执行一个单一的notify:JVM将从它所拥有的众多线程中选择一个并唤醒,如果A被选中,它随后被唤醒,看到PA尚未变成真,转而继续等待.期间本应该可以执行的B却没有被唤醒.这不是严格意义上的"丢失信号"--它更像一个"被劫持的(hijacked)"信号---不过问题是一样的:线程正在等待一个已经(或者本应该)发生过的信号. 使用notify取代notifyAll的情况: 相同的等待者,只有一个条件谓词与条件队列相关,每个线程从wait返回后执行相同的逻辑,并且,一进一出,一个队条件变量的通知,至多只激活一个线程执行. 大多数类都不满足这些条件,因此普遍认可的做法是优先使用notifyAll,而不是单一的notify.尽管使用notifyAll而非notify可能有些低效,但是这样做更容易确保你的类的行为是正确的. 我们可以将之前的put和take操作进行优化,之前是每次put/take时通知,现在可以先检查是否已经为空/满然后在进行通知: public synchronized V take() throws InterruptedException { while (isEmpty()){ wait(); } V v = doTake(); boolean wasFull = isFull(); //如果满了,才通知 if(wasFull){ notifyAll(); } return v; } 尽管"依据条件通知"可以提升性能,但它毕竟只是一种小技巧(而且还让子类的实现变得复杂),应谨慎使用. 单一的通知(notify)和"依据条件通知"都是优化行为.通常进行优化时应该遵循"先让它跑起来,再让它快起来--如果它还没有足够快"的原则:错误地进行优化很容易给程序带来无法预料的活跃度失败. 2.5 入口协议和出口协议 对于每个依赖于状态的操作,以及每个修改了其他状态的操作(对于每一个修改状态的操作,并且其他操作对该状态有状态依赖),都应该为其定义并文档化一个入口协议和出口协议. 入口协议就是操作的条件谓词; 出口协议涉及到要检查任何被操作改变的状态变量,确认它们是否引起其他一些条件谓词变为真,如果是,通知相关的条件队列. AbstractQueuedSynchronizer采用了出口协议的概念,位于java.util.concurrent包下的大部分状态依赖类都构建于它之上. 它没有让Synchronizer类自己去执行通知,而是要求同步方法返回一个值,让这个值说明它的动作是否可能已经阻塞了一个或多个线程.这种显示API的要求,可以避免发生在某些状态转换的过程中"忘记"执行通知. 3. 显示的Condition对象 Condition是具体的内部条件队列,和显示锁在某种角度上看差不多. 内部条件队列有一些缺陷.每个内部锁只能有一个与之相关联的条件队列,这意味着多个线程可能为了不同的条件谓词在同一个条件队列中等待,而且大多数常见的锁模式都会暴露条件队列对象. 如果你想编写一个含有多个条件谓词的并发对象,或者你想获得比条件队列的可见性之外更多的控制权,那么显示的Lock和Condition的实现类提供了一个比内部锁和条件队列更加灵活的选择. 一个Condition和一个单独的Lock相关联,就像条件队列和单独的内部锁相关联一样; 调用与Condition相关联的Lock的Lock.newCondition方法,可以创建一个Condition. 如同Lock提供了比内部加锁要丰富得多的特征集一样,Condition也提供了比内部条件队列要丰富得多的特征集:每个锁可以有多个等待集(因await挂起的线程的集合)、可中断/不可中断的条件等待、基于时限的等待以及公平/非公平队列之间的选择. 不同于内部条件队列,你可以让每个Lock都有任意数量的Condition对象.Condition对象继承了与之相关的锁的公平性特性;如果是公平的锁,线程会依照FIFO的顺序从Condition.await中被释放. 注意事项!!!: wait、notify和notifyAll在Condition对象中的对等体是await、signal和signalAll. 但是,Condition继承与Object,这意味着它也有wait和notify方法. 一定要确保使用了正确的版本--await和signal! 使用condition的实例: public class ConditionBoundedBuffer { protected final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final T[] items = (T[]) new Object[100]; private int tail,head,count; public void put(T x) throws InterruptedException { lock.lock(); try { while(count == items.length){ notFull.await(); } items[tail] = x; if(++tail == items.length){ tail = 0; } ++count; notEmpty.signal(); }finally { lock.unlock(); } } public T take() throws InterruptedException { lock.lock(); try { while(count == 0){ notEmpty.await(); } T x = items[head]; items[head] = null; if( ++head == items.length){ head = 0; } -- count; notFull.signal(); return x; }finally { lock.unlock(); } } } 使用两个Condition,notFull和notEmpty,明确地表示"非满"与"非空"两个条件谓词. 使用Condition的方式具有更好的可读性.Condition简化了使用单一通知的条件.使用更有效的signal,而不是signalAll,这就会减少相当数量的上下文切换,而且每次缓存操作都会出发对锁的请求. 就像内置的锁和条件队列一样,当使用显示的Lock和Condition时,也必须要满足锁、条件谓词和条件变量之间的三元关系: 涉及条件谓词的变量必须由Lock保护,检查条件谓词时以及调用await和signal时,必须持有Lock对象. 显示的Condition和内部条件队列之间的选择: 与在ReentrantLcok和Synchronized之间进行选择是一样的:如果你需要使用一些高级特性,比如公平队列或者让每个锁对应多个等待集,这时使用Condition要好于使用内部条件队列.(如果你需要使用ReentrantLock的高级特性,并已在使用它,那么你已经做出来选择.) 4. 剖析Synchronizer ReentrantLock和Semaphore 有很多共同点,扮演了"阀门"的角色,每次只允许有限条目的线程通过它; 线程到达阀门后,可以允许通过(lock或acquire成功返回),可以等待(lock或acquire阻塞),也可以被取消(tryLock或tryAcquire返回false,指明在允许的时间内,锁或者"许可"不可用). 更进一步,它们都允许可中断的、不可中断的、可限时的请求尝试,它们也都允许选择公平、非公平的等待线程队列. 之所以有这么多的共同点,是因为它们的实现都用到了一个共同的基类,AbstractQueuedSynchronizer(AQS) AQS是一个用来构建锁和Synchronizer的框架.使用AQS能够简单且高效的构造出应用广泛的大量的Synchronizer. 不仅ReentrantLock和Semaphore是构建于AQS的,其他的还有CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask. 一个使用内部锁,实现semaphore功能的例子: public class SemaphoreOnLock { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private int permit; public SemaphoreOnLock(int permit) { lock.lock(); try { //条件谓词加锁保护 this.permit = permit; }finally { lock.unlock(); } } public void acquire() throws InterruptedException { lock.lock(); try { //没有许可集就阻塞 if(permit<=0){ condition.await();
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信