图解AQS的设计与实现,手摸手带你实现一把互斥锁!
AQS是并发编程中非常重要的概念,它是juc包下的许多并发工具类,如CountdownLatch,CyclicBarrier,Semaphore 和锁, 如ReentrantLock, ReaderWriterLock的实现基础,提供了一个基于int状态码和队列来实现的并发框架。本文将对AQS框架的几个重要组成进行简要介绍,读完本文你将get到以下几个点:
- AQS进行并发控制的机制是什么
- 共享模式和独占模式获取和释放同步状态的详细过程
- 基于AQS框架实现一个简易的互斥锁
一,AQS基本概念
1.2 同步队列
二,独占模式获取与释放状态
/** * 尝试获取同步状态【子类中实现】,因为aqs基于模板模式,仅提供基于状态和同步队列的实 * 现框架,具体的实现逻辑由子类决定 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // a. 尝试修改状态值操作执行成功 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // b. 修改状态值成功,记录当前持有同步状态的线程信息 setExclusiveOwnerThread(current); return true; } // 如果当前线程已经持有同步状态,继续修改同步状态【重入锁实现原理,不理解可以先忽略】 } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

二,独占模式获取与释放状态
/** * 尝试获取同步状态【子类中实现】,因为aqs基于模板模式,仅提供基于状态和同步队列的实 * 现框架,具体的实现逻辑由子类决定 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // a. 尝试修改状态值操作执行成功 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // b. 修改状态值成功,记录当前持有同步状态的线程信息 setExclusiveOwnerThread(current); return true; } // 如果当前线程已经持有同步状态,继续修改同步状态【重入锁实现原理,不理解可以先忽略】 } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
/** * 尝试获取同步状态【子类中实现】,因为aqs基于模板模式,仅提供基于状态和同步队列的实 * 现框架,具体的实现逻辑由子类决定 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // a. 尝试修改状态值操作执行成功 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // b. 修改状态值成功,记录当前持有同步状态的线程信息 setExclusiveOwnerThread(current); return true; } // 如果当前线程已经持有同步状态,继续修改同步状态【重入锁实现原理,不理解可以先忽略】 } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
获取失败的线程,加入到同步队列的队尾;加入到队列中后,如果当前节点的前驱节点为头节点再次尝试获取同步状态(下文代码:p == head && tryAcquire(arg))。
/** * 没有获取到同步状态的线程加入到队尾部 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 尝试用最快的方式入队,如果入队失败,再走完整的入队方法 Node pred = tail; if (pred != null) { node.prev = pred; // 将当前线程设置到队尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 正常的入队方法 enq(node); return node; } /** * 同步队列中节点,尝试获取同步状态 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋(死循环) for (;;) { // 只有当前节点的前驱节点是头节点时才会尝试执行获取同步状态操作 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node);// 注意: 此处重点, 当前节点设置为头节点,相当于头节点出队 p.next = null; // help GC failed = false; return interrupted; } // 获取失败后是否进入wait if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally {