并发编程(六)——AbstractQueuedSynchronizer 之 Condition 源码分析

目录 公平锁和非公平锁 Condition 1、大体实现流程 2.await方法 3. 将节点加入到条件队列 4. 完全释放独占锁 5. 等待进入阻塞队列 6. signal 唤醒线程,转移到阻塞队列 7. 获取独占锁 7. 总结 正文 我们接着上一篇文章继续,本文讲讲解ReentrantLock 公平锁和非公平锁的区别,深入分析 AbstractQueuedSynchronizer 中的 ConditionObject 回到顶部 公平锁和非公平锁 ReentrantLock 默认采用非公平锁,除非你在构造方法中传入参数 true 。 复制代码 public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } 复制代码 公平锁的 lock 方法: 复制代码 static final class FairSync extends Sync { final void lock() { acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 复制代码 非公平锁的 lock 方法: 复制代码 static final class NonfairSync extends Sync { final void lock() { // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 复制代码 总结:公平锁和非公平锁只有两处不同: 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。 公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。 非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。 回到顶部 Condition JUC提供了Lock可以方便的进行锁操作,但是有时候我们也需要对线程进行条件性的阻塞和唤醒,这时我们就需要condition条件变量,它就像是在线程上加了多个开关,可以方便的对持有锁的线程进行阻塞和唤醒。 Condition主要是为了在J.U.C框架中提供和Java传统的监视器风格的wait,notify和notifyAll方法类似的功能。 condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。 每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例: 复制代码 final ConditionObject newCondition() { return new ConditionObject(); } 复制代码 我们首先来看下我们关注的 Condition 的实现类 AbstractQueuedSynchronizer 类中的 ConditionObject。 复制代码 public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // 条件队列的第一个节点 // 不要管这里的关键字 transient,是不参与序列化的意思 private transient Node firstWaiter; // 条件队列的最后一个节点 private transient Node lastWaiter; ...... 复制代码 在上一篇介绍 AQS 的时候,我们有一个阻塞队列,用于保存等待获取锁的线程的队列。这里我们引入另一个概念,叫条件队列(condition queue) 1、大体实现流程 AQS等待队列与Condition队列是两个相互独立的队列 await()就是在当前线程持有锁的基础上释放锁资源,并新建Condition节点加入到Condition的队列尾部,阻塞当前线程 signal()就是将Condition的头节点移动到AQS等待节点尾部,让其等待再次获取锁 以下是AQS队列和Condition队列的出入结点的示意图,可以通过这几张图看出线程结点在两个队列中的出入关系和条件。 I.初始化状态:AQS等待队列有3个Node,Condition队列有1个Node(也有可能1个都没有) II.节点1执行Condition.await() 1.将head后移 2.释放节点1的锁并从AQS等待队列中移除 3.将节点1加入到Condition的等待队列中 4.更新lastWaiter为节点1 III.节点2执行signal()操作 5.将firstWaiter后移 6.将节点4移出Condition队列 7.将节点4加入到AQS的等待队列中去 8.更新AQS的等待队列的tail 基本上,把这几张图看懂,你也就知道 condition 的处理流程了。   1.我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter 和 lastWaiter;   2.每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;   3.调用 condition1.signal() 会将condition1 对应的条件队列的 firstWaiter 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法返回,继续往下执行。 这里,我们简单回顾下 Node 的属性: 复制代码 volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; 复制代码 prev 和 next 用于实现阻塞队列的双向链表,nextWaiter 用于实现条件队列的单向链表 2.await方法 ReentrantLock是独占锁,一个线程拿到锁后如果不释放,那么另外一个线程肯定是拿不到锁,所以在lock.lock()和lock.unlock()之间可能有一次释放锁的操作(同样也必然还有一次获取锁的操作)。在进入lock.lock()后唯一可能释放锁的操作就是await()了。也就是说await()操作实际上就是释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁! 复制代码 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //构造一个新的等待队列Node加入到队尾 int savedState = fullyRelease(node); //释放当前线程的独占锁,不管重入几次,都把state释放为0 int interruptMode = 0; //如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞 while (!isOnSyncQueue(node)) { // 线程挂起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中断则直接退出自旋 break; } //退出了上面自旋说明当前节点已经在同步队列上,但是当前节点不一定在同步队列队首。acquireQueued将阻塞直到当前节点成为队首,即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 复制代码 3. 将节点加入到条件队列 复制代码 // 将当前线程对应的节点入队,插入队尾 private Node addConditionWaiter() { Node t = lastWaiter; // 如果条件队列的最后一个节点取消了,将其清除出去 if (t != null && t.waitStatus != Node.CONDITION) { // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列 unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果队列为空 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } 复制代码 在addWaiter 方法中,有一个 unlinkCancelledWaiters() 方法,该方法用于清除队列中已经取消等待的节点。 复制代码 // 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去 // 纯属链表操作,很好理解,看不懂多看几遍就可以了 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } 复制代码 4. 完全释放独占锁 复制代码 // 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值 // 对于最简单的操作:先 lock.lock(),然后 condition1.await()。 // 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1 // 相应的,如果 lock 重入了 n 次,savedState == n // 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } 复制代码 5. 等待进入阻塞队列 复制代码 int interruptMode = 0; while (!isOnSyncQueue(node)) { // 线程挂起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } 复制代码 isOnSyncQueue(Node node) 用于判断节点是否已经转移到阻塞队列了: 复制代码 final boolean isOnSyncQueue(Node node) { //如果当前节点状态是CONDITION或node.prev是null,则证明当前节点在等待队列上而不是同步队列上。之所以可以用node.prev来判断,是因为一个节点如果要加入同步队列,在加入前就会设置好prev字段。 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果node.next不为null,则一定在同步队列上,因为node.next是在节点加入同步队列后设置的 if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); //前面的两个判断没有返回的话,就从同步队列队尾遍历一个一个看是不是当前节点。 } // 从同步队列的队尾往前遍历,如果找到,返回 true private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } 复制代码 回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this); 这里线程挂起。 6. signal 唤醒线程,转移到阻塞队列 为了大家理解,这里我们先看唤醒操作,因为刚刚到 LockSupport.park(this); 把线程挂起了,等待唤醒。 唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。 复制代码 // 唤醒等待了最久的线程 // 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列 public final void signal() { // 调用 signal 方法的线程必须持有当前的独占锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // 从条件队列队头往后遍历,找出第一个需要转移的 node // 因为前面我们说过,有些线程会取消排队,但是还在队列中 private void doSignal(Node first) { do { // 将 firstWaiter 指向 first 节点后面的第一个 // 如果将队头移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推 } // 将节点从条件队列转移到阻塞队列 // true 代表成功转移 // false 代表在 signal 之前,节点已经取消了 final boolean transferForSignal(Node node) { // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消, // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点 // 否则,将 waitStatus 置为 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // enq(node): 自旋进入阻塞队列的队尾 // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点 Node p = enq(node); int ws = p.waitStatus; // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释 // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节 LockSupport.unpark(node.thread); return true; } 复制代码 正常情况下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这句中,ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 会返回 true,所以一般也不会进去 if 语句块中唤醒 node 对应的线程。然后这个方法返回 true,也就意味着 signal 方法结束了,节点进入了阻塞队列,此时await()还是挂起状态,并没有被唤醒。 我们可以看到,signal方法只是将Node修改了状态,并没有唤醒线程。要将修改状态后的Node唤醒,唤起线程是在unlock()中。这个方法会对阻塞队列里面的线程从头到尾对状态为-1的节点做唤醒操作,具体可以看我上一篇文章,并发编程(五)——AbstractQueuedSynchronizer 之 ReentrantLock源码分析 unlock()将此线程唤醒后,await()中可以继续执行,此线程被唤醒的时候它的前驱节点肯定是首节点了,因为unlock()方法是从头到尾进行唤醒 假设发生了阻塞队列中的前驱节点取消等待,或者 CAS 失败,只要唤醒线程,让其进到下一步即可。 7. 获取独占锁 线程被唤醒后,此时节点已经在缓存队列的第一个等待节点了,while (!isOnSyncQueue(node)) 将会退出循环。 复制代码 1 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2 interruptMode = REINTERRUPT; 3 4 final boolean acquireQueued(final Node node, int arg) { 5 boolean failed = true; 6 try { 7 boolean interrupted = false; 8 for (;;) { 9 final Node p = node.predecessor(); 10 if (p == head && tryAcquire(arg)) { 11 setHead(node); 12 p.next = null; // help GC 13 failed = false; 14 return interrupted; 15 } 16 if (shouldParkAfterFailedAcquire(p, node) && 17 parkAndCheckInterrupt()) 18 interrupted = true; 19 } 20 } finally { 21 if (failed) 22 cancelAcquire(node); 23 } 24 } 复制代码 重新获取锁就在acquireQueued方法中,上一篇文章中已经详细分析了此方法,上面已经说过,unlock()解锁时,此线程已经在阻塞队里的第一个节点,所以第10行代码处就能尝试获取锁,并将state设置为之前的状态。此时就可以接着await()后面的业务代码继续执行了。 我们想想,上面 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 处,ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 会返回 true时,不执行LockSupport.unpark(node.thread); 呢? 其实笔者认为这里不加这个判断条件应该也是可以的。只是对于CAS修改前驱节点状态为SIGNAL成功这种情况来说,如果不加这个判断条件,提前唤醒了线程,等进入acquireQueued方法了节点发现自己的前驱不是首节点,还要再阻塞,等到其前驱节点成为首节点并释放锁时再唤醒一次;而如果加了这个条件,线程被唤醒的时候它的前驱节点肯定是首节点了,线程就有机会直接获取同步状态从而避免二次阻塞,节省了硬件资源。 7. 总结 总的来说,Condition的本质就是等待队列和同步队列的交互: 当一个持有锁的线程调用Condition.await()时,它会执行以下步骤: 构造一个新的等待队列节点加入到等待队列队尾 释放锁,也就是将它的同步队列节点从同步队列队首移除 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal())或被中断 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。 当一个持有锁的线程调用Condition.signal()时,它会执行以下操作: 从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。 对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。然后分两种情况讨论: 如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4. 如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4. https://www.cnblogs.com/java-chen-hao/p/10175770.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信