多线程学习笔记五之读写锁实现分析

目录 简介 读写状态 读锁计数器 共享锁的获取 tryAcquireShared(int unused) doAcquireShared(int arg) 共享锁的释放 tryReleaseShared(int unused) doReleaseShared() 写锁获取 tryAcquire(int acquires) 写锁释放 tryRelease(int releases) 锁降级 总结 简介   在前一篇博客多线程学习笔记三之ReentrantLock与AQS实现分析分析了基于同步器AQS实现的独占锁ReentrantLock,AQS同步器作为JUC组件实现锁的框架,基于AQS除了可以实现独占锁,还可以实现共享锁。   ReentrantReadWriteLock是基于AQS实现的读写锁,内部维护了一个读锁(共享锁)和写锁(独占锁)。如果我们要在程序中提供共享的缓存数据结构,缓存肯定是读操作(数据查询)多而写操作(数据更新)少,只要保证写操作对后续的读操作是可见的就行了,这种情况下使用独占锁就不如读写锁的吞吐量大,读写锁中的读锁允许多个线程获得读锁对资源进行读操作,写锁是传统的独占锁,只允许单个线程获得写锁对资源进行更新。以下是JDK提供基于ReentrantReadWriteLock简单实现缓存结构的Demo: class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必须先释放读锁再获取写锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { //再次检查cacheValid防止其他线程获得写锁改变cacheValid值 if (!cacheValid) { data = ... cacheValid = true; } // 写锁降级为读锁 rwl.readLock().lock(); } finally { //释放写锁 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }   ReentranReadWriteLock的关系图: ReentrantReadWriteLock没有实现Lock接口,实现了ReadWriteLock接口。内部类ReadLock和WriteLock实现Lock接口,ReadLock和WriteLock包含了继承了AQS的Sync对象,从而提供了共享锁和独占锁特性的实现。读写锁ReentrantReadWriteLock具有以下特性: 可重入,不管是读锁还是写锁,都是可重入锁 公平锁和非公平锁,支持以公平方式或非公平方式(默认方式)获取读锁和写锁。 支持锁降级,线程获得写锁之后可以降级为读锁,具体是先获取写锁,再获得读锁,再释放写锁。但读锁不可升级为写锁。 读写状态   在实现ReentrantLock时,当一个线程去尝试获取锁时,线程会去检查同步器AQS中维护的int型变量state是否为0,同步状态加一表示当前线程成功获取锁。而读写锁ReentrantReadWriteLock维护了读锁和写锁,那么一个线程获得了锁,怎么通过state表明到底是读锁还是写锁呢?答案是把int型变量切位两部分,高16位表示读状态,低16位表示写状态。ReentrantReadWriteLock在内部类Sync定义了以下常量用以区分读写状态: //偏移量 static final int SHARED_SHIFT = 16; //线程获得读锁,state加SHARED_UNIT,state高16位SHARED_UNIT个数代表了有多少个共享锁 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读写锁重入最多不超过65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } 通过把32位int型变量state按位切割成两部分维护读写两种状态,具体划分如图: 从图中可以看到,当前线程获取了写锁,重进入了3次,连续获得了两次读锁,每次获得写锁,就把state加1,而低16位总共最大是65535,就是MAX_COUNT的值。每获得一次读锁,就把state加SHARED_COUNT。那么如何获取读写状态呢?只要通过位运算取出高16位或低16位就行了,对于读状态,state>>>SHARED_SHIFT(无符号补0右移16位)就可以得到加了多少次SHARED_UNIT从而获得读状态;对于写状态,state & EXCLUSIVE_MASK(0X0000FFFF,高16位都变为0,低16位不变)就可以获得写状态。 读锁计数器   由于ReentrantReadWriteLock支持读写锁的重入,而写锁是独占锁,只要取出同步状态state低16位对应的数值就是获得写锁的重入次数;而读锁是共享锁,每个线程获得读锁就会把state加上SHARED_UNIT(包括读锁重入),取出state高16位的对应的数值表示是所有线程获得读锁的次数,但是如何获得单个线程获得共享锁的次数呢?内部类Sync为同步器维护了一个读锁计数器,专门统计每个线程获得读锁的次数。Sync内部有两个内部类分别为HoldCounter和ThreadLocalHoldCounter: abstract static class Sync extends AbstractQueuedSynchronizer { static final class HoldCounter { //计数器,用于统计线程重入读锁次数 int count = 0; // Use id, not reference, to avoid garbage retention //线程TID,区分线程,可以唯一标识一个线程 final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal { //重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值 public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { //本地线程读锁计数器 readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } } firstReader和firstReaderHoldCount   如果只有一个线程获取了读锁,就不需要使用本地线程变量readHolds,当前线程就是第一个获得读锁的线程firstReader,使用firstReaderHoldCount存储线程重入次数。 readHolds   第一个获得读锁的线程使用firstReaderHoldCount存储读锁重入次数,后面的线程就要使用ThreadLocal类型变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数。 cachedHoldCounter   缓存计数器,是最后一个获取到读锁的线程计数器,每当有新的线程获取到读锁,这个变量都会更新。如果当前线程不是第一个获得读锁的线程,先到缓存计数器cachedHoldCounter查看缓存计数器是否指向当前线程,不是再去readHolds查找,通过缓存提高效率。 共享锁的获取   获取读锁,由内部类ReadLock提供lock方法,调用了Sync父类AQS的方法: //获取读锁 public void lock() { sync.acquireShared(1); } //获取共享锁 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } tryAcquireShared(int unused)   尝试获取共享锁: protected final int tryAcquireShared(int unused) { //当前线程 Thread current = Thread.currentThread(); //同步状态state int c = getState(); //检查独占锁是否被占据,如果被占据,是否是当前线程获取了独占锁 //如果是当前线程获取了写锁,可以继续获取读锁,如果都不是返回-1表示获取失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读锁数量 int r = sharedCount(c); //!readerShouldBlock() 根据公平与否策略和队列是否含有等待节点决定当前线程是否继续获取锁 //不能大于65535且CAS修改成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //如果没有线程获取过读锁 if (r == 0) { //将当前线程设置为第一个读锁线程 firstReader = current; // 计数器为一 firstReaderHoldCount = 1; //读锁重入 } else if (firstReader == current) { //计数器加一 firstReaderHoldCount++; } else { // 如果不是第一个线程,获取锁成功 // cachedHoldCounter 代表的是最后一个获取读锁的线程的计数器 HoldCounter rh = cachedHoldCounter; // 如果计数器是 null 或者不指向当前线程,那么就新建一个 HoldCounter 对象 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //计数器为0,保存到readHolds中 else if (rh.count == 0) readHolds.set(rh); //计数器加一 rh.count++; } return 1; } return fullTryAcquireShared(current); } fullTryAcquireShared(Thread current)   当已有线程占据独占锁、读锁数量超过MAX_COUNT、不满足公平策略或者CAS设置state失败,就会调用这个方法。与tryAcquireShared方法逻辑大体相似。 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //死循环 for (;;) { //同步状态 int c = getState(); //检查写锁获取情况 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; //进入到这里,说明没有其他线程获取写锁 //公平锁策略检查 } else if (readerShouldBlock()) { //readerShouldBlock()返回true,应该堵塞,检查是否获取过读锁 // 第一个获取读锁线程是当前线程,重入 if (firstReader == current) { } else { //循环中,若计数器为null if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } //需要阻塞且是非重入(还未获取读锁的),获取失败。 if (rh.count == 0) return -1; } } //检查读锁总数量是否超过最大值 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS设置同步状态state if (compareAndSetState(c, c + SHARED_UNIT)) { //当前线程获得第一个读锁 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; //读锁重入 } else if (firstReader == current) { firstReaderHoldCount++; } else { //从缓存读入计数器,提高效率 if (rh == null) rh = cachedHoldCounter; //计数器为空或不是指向当前线程 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } doAcquireShared(int arg)   当tryAcquireShared尝试获取共享锁失败,返回-1,进入AQS同步队列等待获取共享锁 private void doAcquireShared(int arg) { //将当前节点以共享型类型加入同步队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //前驱节点获取到锁,可能占据锁,也可能已经释放锁,调用tryAcquireShared尝试获取锁 if (p == head) { int r = tryAcquireShared(arg); //获取成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //与独占锁ReentrantLock堵塞逻辑一致 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //因中断/超时,取消获取锁 if (failed) cancelAcquire(node); } } 共享锁的释放   释放读锁,由内部类ReadLock提供unlock方法,调用了Sync父类AQS的方法: public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } tryReleaseShared(int unused)   tryReleaseShared返回true,即同步状态为0,不存在线程占据读锁或写锁。 protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //当前线程是第一个获得读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; //不是firstReader,更新计数器 } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; //完全释放锁 if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } //重入锁退出 --rh.count; } //CAS更新同步状态, for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } doReleaseShared()   tryReleaseShared方法成功释放锁,调用doReleaseShared唤醒后继节点。 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //如果节点状态为 Node.SIGNAL,将状态设置为0,设置成功,唤醒线程。 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //如果本身头结点的waitStatus是出于重置状态(waitStatus==0)的, //将其设置为“传播”状态。意味着需要将状态向后一个节点传播。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } 写锁获取   获取写锁,由内部类WriteLock提供lock方法,调用了Sync父类AQS的方法,重点解析一下tryAcquire实现: public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } tryAcquire(int acquires)   内部类Sync重写的tryAcquire方法: protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //同步状态不为0 if (c != 0) { //其他线程获得写锁,获取失败;w为0而同步状态不为0,没有线程占据写锁,有线程占据读锁 //注意:不存在读锁与写锁同时被多个线程获取的情况。 if (w == 0 || current != getExclusiveOwnerThread()) return false; //当前线程已经获得写锁,重入次数超过MAX_COUNT,失败 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 锁重入 setState(c + acquires); return true; } //公平策略检查 //CAS设置同步状态成功则获得写锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclu
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信