当前位置: 首页 > news >正文

javalock(八)ReentrantReadWriteLock

ReentrantReadWriteLock:
同时实现了共享锁和排它锁。内部有一个sync,同时实现了tryAcquire/tryReleases、tryAcquireShared/tryReleasesShared,一共四个函数,然后ReentrantReadWriteLock内部还实现了一个ReadLock和一个WriteLock,ReadLock和WriteLock都实现了lock和unlock函数,然后ReadLock和WriteLock是对同一个Sync对象的封装,不同之处在于ReadLock的lock函数调用的是Sync.tryAcquireShared,unlock调用的是tryReleasesShared,而WriteLock的lock函数调用的是Sync.tryAcquire/Sync.tryReleases。还有ReadLock重载了newCondition,直接抛异常,因为Condition会调用isHeldExclusive来判断当前线程是否拥有排它锁,而ReadLock是共享锁,这矛盾了,所以ReadLock重载了newCondition函数,然后直接抛异常,而WriteLock则可以正常newCondition,因为WriteLock是排它锁,所以可以支持condition,换句话说:只有排它锁才支持condition。

ReentrantReadWriteLock:
锁资源由两部分组成{state,Holder},state是一个int,用来记录已经获取读锁的线程数和已经获取写锁的线程数,state的32位字节分成了两部分,高16位表示已经获取读锁的线程数,低16位表示写者重入的次数,因为写锁是排它锁,也就是说只会有一个线程获取写锁,所以如果state低16位不为0就表示有人获得了写锁,然后这里就直接用低16位来记录写锁重入的个数,如果读者数不为0,则写者数必定为0,如果写者数为0则读者数必定为0,也就是说读写锁互斥。holder则是记录锁的获取信息,因为是reentrant即可冲入锁,也就是说可能出现这种情况:多个线程同时获取了读锁,然后多个线程又多次readLock.lock,所以就需要一个结构体来记录当前线程是否获得了读锁以及当前线程重入的次数,所以holder是一个threadlocal变量,每个线程都有一份,如果当前线程没有获得该锁,则删除该threadLocal,前面说了写锁重入次数直接用state低16位记录,并且用父类的owner来记录谁获得了写锁,所以写锁不用holder,holder只用于读锁。注意ReentrantLock不需要holder的原因是ReentrantLock是排它锁,最多只有一个线程能获得锁,而父类提供的state和owner变量就足够了,所以不需要holder变量。

一个线程已经获取写锁以后,可以继续获取读锁,但是反过来不行,一个线程获取了读锁后,是不允许再次获取写锁的,原因很简单,如果线程先获取了写锁,那么就能保证其他所有线程都不能获取读锁,所以线程可以安全获取读锁,因为只有他一个人能获取读写权限,而反过来如果线程先获取了读锁,那么就可能还有其他线程此时也获取了读锁,这样就不止一个线程获取了读锁,所以此时是不能获取写锁的。


import java.util.concurrent.TimeUnit;
import java.util.Collection;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;//同时实现了读写锁
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {private static final long serialVersionUID = -6992448646407690164L;//读锁对象private final ReentrantReadWriteLock.ReadLock readerLock;//写锁对象private final ReentrantReadWriteLock.WriteLock writerLock;//读锁对象和写锁对象封装了同一个sync对象final Sync sync;public ReentrantReadWriteLock() {this(false);}//默认是非公平锁public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();//readLock和writeLock都是使用的同一个ReentrantReadWriteLock对象readerLock = new ReadLock(this);writerLock = new WriteLock(this);}public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 6317671515068378041L;//读者数偏移,读者数=state>>SHARED_SHIFTstatic final int SHARED_SHIFT   = 16;//读者数+1就直接加SHARED_UNIT就行static final int SHARED_UNIT    = (1 << SHARED_SHIFT);//最大读者数static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;//写者重入次数数掩码,写者数=state&&EXCLUSIVE_MASKstatic final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//定义的holder类,记录了当前线程id,以及加读锁的次数//如果线程获得了读锁,就会拥有一个holdCounter对象//如果线程没有获取读锁,就会删除holdCounter对象//因为HoldCounter对象被用作ThreadLocal对象static final class HoldCounter {//记录读锁重入的次数int count = 0;//线程idfinal long tid = getThreadId(Thread.currentThread());}static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}//这里解释一下readHolds/cachedHoldCounter/firstReader/firstReaderHoldCount//这些变量都只有一个目的:如果当前线程持有读锁,则返回本线程重入次数//readHolds是一个ThreadLocal变量,其他三个都不是ThreadLocal//本来要返回本线程重入次数直接返回ThreadLocal内保存的count就行了//但是ThreadLocal.get可能比较慢,所以为了优化,//就用了两个变量firstReader/cachedHolderCount//firstReader记录的是当没有任何获取读锁时,记录第一个获得读锁的线程的thread对象//cachedHolderCount记录的是上次获取读锁的线程的id//readHolds记录的是本线程的信息,是threadLocal//然后如果有人要获取当前线程读锁的重入次数,那么获取逻辑是这样的://先看能不能从firstReader读,如果firstReader就是当前线程//那么就直接返回firstReadHoldCount,就不用去读threadLocal了//如果firstReader不是当前线程,则尝试去cachedHolderCount读//如果上一次获取读锁的线程就是当前线程,那么ok,可以直接从cachedHolderCount读//这样就不用去读ThreadLocal了,如果都失败了,那么就只能去读readHolds了//而读threadLocal肯定是比较慢的。。。(花里胡哨的,不过文档说有助于提高并发效率。。。)private transient ThreadLocalHoldCounter readHolds;private transient HoldCounter cachedHoldCounter;private transient Thread firstReader = null;private transient int firstReaderHoldCount;Sync() {readHolds = new ThreadLocalHoldCounter();setState(getState()); }abstract boolean readerShouldBlock();abstract boolean writerShouldBlock();//释放写锁protected final boolean tryRelease(int releases) {//首先判断当前线程是否拥有锁:直接判断owner是不是当前线程,如果是则拥有if (!isHeldExclusively())throw new IllegalMonitorStateException();//然后扣减写锁计数,写锁是低16位,所以可以直接减int nextc = getState() - releases;//判断写锁重入计数是否为0boolean free = exclusiveCount(nextc) == 0;if (free)//如果是则设置owner为nullsetExclusiveOwnerThread(null);//然后设置state//笔记:在释放写锁前,读者数必定为0setState(nextc);return free;}//获取读锁protected final boolean tryAcquire(int acquires) {//获取当前线程Thread current = Thread.currentThread();//获取锁资源状态int c = getState();//计算写锁重入次数个数int w = exclusiveCount(c);//如果c!=0,表示有人获得了读锁或者有人获得了写锁if (c != 0) {//c!=0,但是w=0,表明有人获得了读锁//c!=0,并且w!=0,表示有人获得了写锁,所以还需要判断是不是自己获得了写锁//所以这里就是如果有人获得了读锁或者获取写锁的线程不是自己,那么本次写锁获取失败if (w == 0 || current != getExclusiveOwnerThread())return false;//如果是自己获得了写锁,则判断可重入次数是否超过了(2^16-1)次if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");//如果没有,则表示成功获取锁资源,返回true//注意:此时是线程已经获得了锁,并且是自己,// 也就是说只有本线程可以修改state,所以此时无需用cas操作setState(c + acquires);return true;}//如果c==0表示此时没有任何人获得锁//因为ReentrantReadWrite支持公平或者非公平锁,所以writeShouldBlock是一个抽象方法//公平方式:看aqs的sync list是否有节点等待,如果有则返回true,表示当前线程不能获取锁//非公平方式:writeShouldBlock直接返回false,表示立即尝试获取锁if (writerShouldBlock() ||//如果不需要阻塞,则立即通过cas尝试获取锁!compareAndSetState(c, c + acquires))//如果cas获取失败就返回false表示本次获取锁失败return false;//获取成功就设置锁拥有者为自己setExclusiveOwnerThread(current);return true;}//释放读锁protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();//此处到下面for循环前面的都是为了更新当前线程的读锁重入次数//如果当前线程就是自己,那么直接更新firstReaderHoldCount//就不用去写threadLocal了if (firstReader == current) {//如果重入次数为1,那么-1之后就表示释放掉了//所以直接把firstReader设置为null//这样,firstReader是无效的,那么firstReaderHoldCount就肯定失效了//所以此处只设置firstReader=null,而没有扣减firstReaderHoldCountif (firstReaderHoldCount == 1)firstReader = null;else//否则读锁重入次数大于1,释放一次后,线程还是拥有读锁//所以firstReader不用变firstReaderHoldCount--;} else {//如果自己不是firstReader,那么再尝试cachedHoldCounter//即尝试自己是不是上一次访问的线程HoldCounter rh = cachedHoldCounter;//如果缓存的holder为null或者缓存的holder不是当前线程的if (rh == null || rh.tid != getThreadId(current))//那么就表示从cachedHoldCounter读取失败//此时就只能去读threadLocal了rh = readHolds.get();//此时rh为cachedHoldCounter或者readHoldsint count = rh.count;//如果count<=1,那么释放一次后就为0//所以此时就要移除threadLocal//免得内存泄漏,比如一个线程获取了读锁,释放后就永不再是用这个rw锁了if (count <= 1) {//移除threadLocalreadHolds.remove();if (count <= 0)//如果count<=0就表示自己没有持有读锁,却常是释放读锁throw unmatchedUnlockException();}//当前线程重入次数-1--rh.count;}//更新完读锁重入次数,那么下面就是通过cas更新state也就是更新读锁总数for (;;) {//获取读锁总次数int c = getState();//-1int nextc = c - SHARED_UNIT;//cas设置stateif (compareAndSetState(c, nextc))return nextc == 0;}}private IllegalMonitorStateException unmatchedUnlockException() {return new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread");}//尝试获取读锁//!!!在持有写锁的状态下允许获取读锁,但是反过来持有读锁时是不允许再次获取写锁//也就是说支持锁降级(当然,这里没有降级),但是不支持锁升级protected final int tryAcquireShared(int unused) {//获取当前线程对象Thread current = Thread.currentThread();//获取锁资源状态int c = getState();//判断是否有线程持有写锁,如果有,则判断是不是当前线程if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)//如果不是,则返回-1表剩余读锁资源不足,读锁获取失败return -1;//走到此处,要么无人获取写锁,要么自己获取了写锁//先获取读者计数int r = sharedCount(c);//判断本次获取读锁是否应该阻塞,分公平和非公平方式//非公平方式:如果aqs队列中第一个等待的线程是要获取写锁,那么本次读者应该等待//也就是避免写进程长时间获取不到读锁,也就是说如果第一个等待的线程是要获取读锁,//那么本线程会立即获取读锁,不管后面的,也就是说对读不公平,但是对写带点公平//公平方式:如果aqs队列中有等待的进程,则本次获取read需要阻塞// 可能是因为达到总读锁上限了,因为state用于读者锁计数的只有16位if (!readerShouldBlock() &&//如果没有达到总读者锁重入计数r < MAX_COUNT &&//那么就尝试获取读锁compareAndSetState(c, c + SHARED_UNIT)) {//下面就是更新线程对应的读锁重入次数了//r==0表示本线程是第一个获得读锁的线程if (r == 0) {//那么就设置firstReader为本线程firstReader = current;//并且读锁重入次数为1firstReaderHoldCount = 1;//如果当前线程正好是第一个读者} else if (firstReader == current) {//那么直接更新firstReaderHoldCount就行firstReaderHoldCount++;} else {//反之判断cachedHoldCounter是否为当前线程HoldCounter rh = cachedHoldCounter;//如果cached为null或者cached不是当前线程if (rh == null || rh.tid != getThreadId(current))//则更新cached为当前线程的readHoldcachedHoldCounter = rh = readHolds.get();//如果cached是自己,如果count=0表示我们之前已经删除了//那么直接设置readHolds等于我们缓存的,就不用重新创建了else if (rh.count == 0)readHolds.set(rh);//更新本线程读锁重入次数rh.count++;}//返回获取锁成功return 1;}//如果获取锁失败或者本线程获取读锁需要阻塞,则走重逻辑获取锁//重逻辑和上面的逻辑几乎一样return fullTryAcquireShared(current);}//while cas获取读锁final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {//获取锁状态int c = getState();//判断是不是有人获取了写锁if (exclusiveCount(c) != 0) {//如果有,则再判断是不是自己if (getExclusiveOwnerThread() != current)//如果不是,则返回失败return -1;//如果本次读者需要阻塞://非公平方式:如果aqs队列中第一个等待的线程是要获取写锁,那么本次读者应该等待//也就是避免写进程长时间获取不到读锁,也就是说如果第一个等待的线程是要获取读锁,//那么本线程会立即获取读锁,不管后面的,也就是说对读不公平,但是对写带点公平//公平方式:如果aqs队列中有等待的进程,则本次获取read需要阻塞// 可能是因为达到总读锁上限了,因为state用于读者锁计数的只有16位} else if (readerShouldBlock()) {//这里又是一样的逻辑,用来更新count的,懒得写了if (firstReader == current) {} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");//cas操作尝试获取锁if (compareAndSetState(c, c + SHARED_UNIT)) {//下面又是更新count的逻辑了,和上面一样,略if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; }//返回1表示读锁资源剩余1//这里是固定返回1,就是说不管获取多少次读锁,剩余读锁资源总是1return 1;}}}//和tryAcquire逻辑一模一样,唯一区别就是这里即使线程拥有写锁,也是通过cas更新state//略final boolean tryWriteLock() {Thread current = Thread.currentThread();int c = getState();if (c != 0) {int w = exclusiveCount(c);if (w == 0 || current != getExclusiveOwnerThread())return false;if (w == MAX_COUNT)throw new Error("Maximum lock count exceeded");}if (!compareAndSetState(c, c + 1))return false;setExclusiveOwnerThread(current);return true;}//和fullTryAcquireShared逻辑几乎一模一样,略final boolean tryReadLock() {Thread current = Thread.currentThread();for (;;) {int c = getState();if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return false;int r = sharedCount(c);if (r == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return true;}}}//判断当前线程是否拥有写锁protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}//写锁支持条件变量,writeLock的newCondition中调用这个函数//注意:读锁是不支持的,readLock的newCondition函数则是直接跑UnSupport异常final ConditionObject newCondition() {return new ConditionObject();}//如果写锁已分配,获取持有写锁的线程对象final Thread getOwner() {return ((exclusiveCount(getState()) == 0) ?null :getExclusiveOwnerThread());}//获取读锁总次数final int getReadLockCount() {return sharedCount(getState());}//判断是否有现成是否已经获得了写锁final boolean isWriteLocked() {return exclusiveCount(getState()) != 0;}//获取写锁重入次数,就是state低15位final int getWriteHoldCount() {return isHeldExclusively() ? exclusiveCount(getState()) : 0;}//获取当前线程读锁重入次数final int getReadHoldCount() {if (getReadLockCount() == 0)return 0;Thread current = Thread.currentThread();//先尝试从firstReader读取,if (firstReader == current)return firstReaderHoldCount;//如果firstReader不是自己,则再尝试从cachedHolderCount读取HoldCounter rh = cachedHoldCounter;if (rh != null && rh.tid == getThreadId(current))return rh.count;//如果cachedHolderCount也不是自己,那么最后才去读threadLocal,即读readHolderint count = readHolds.get().count;if (count == 0) readHolds.remove();return count;}private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();readHolds = new ThreadLocalHoldCounter();setState(0); }final int getCount() { return getState(); }}//非公平锁static final class NonfairSync extends Sync {private static final long serialVersionUID = -8159625535654395037L;//本次获取写锁是否该阻塞?答案是:用不阻塞final boolean writerShouldBlock() {return false; }//本次获取读锁是否应该阻塞?//答案是如果是如果第一个等待的线程是写锁,则本次获取读锁需要阻塞//反之如果是读锁,则不阻塞,对读锁不公平,对写锁稍显公平final boolean readerShouldBlock() {return  apparentlyFirstQueuedIsExclusives();}}//公平方式static final class FairSync extends Sync {private static final long serialVersionUID = -2274990926593161451L;//本次获取写锁是否需要阻塞?如果等待队列不为空则需要阻塞final boolean writerShouldBlock() {return hasQueuedPredecessors();}//本次获取读锁是否需要阻塞?如果等待队列不为空则需要阻塞final boolean readerShouldBlock() {return hasQueuedPredecessors();}}//下面就是ReadLock和WriteLock了,这两都是对上面的Sync的一个简单封装//ReadLock和WriteLock的各种函数都是简单的转调用Sync的相关函数,//一眼就能看明白,略public static class ReadLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -5992448646407690164L;private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() {sync.acquireShared(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean tryLock() {return sync.tryReadLock();}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public void unlock() {sync.releaseShared(1);}//!!!读锁不支持条件变量public Condition newCondition() {throw new UnsupportedOperationException();}public String toString() {int r = sync.getReadLockCount();return super.toString() +"[Read locks = " + r + "]";}}public static class WriteLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -4992448646407690164L;private final Sync sync;protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() {sync.acquire(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock( ) {return sync.tryWriteLock();}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}public void unlock() {sync.release(1);}//写锁支持条件变量,就是AQS的条件变量public Condition newCondition() {return sync.newCondition();}public String toString() {Thread o = sync.getOwner();return super.toString() + ((o == null) ?"[Unlocked]" :"[Locked by thread " + o.getName() + "]");}public boolean isHeldByCurrentThread() {return sync.isHeldExclusively();}public int getHoldCount() {return sync.getWriteHoldCount();}}
http://www.lryc.cn/news/506792.html

相关文章:

  • 反射和设计模式
  • 双指针---和为s的两个数字
  • LLaMA-Factory 单卡3080*2 deepspeed zero3 微调Qwen2.5-7B-Instruct
  • 智慧农业云平台与水肥一体化:道品科技引领农业现代化新潮流
  • 241207_MindNLP中的大模型微调
  • MongoDB、Mongoose使用教程
  • 单片机:实现控制步进电机正反转(附带源码)
  • 安装指南|OpenCSG Starship上架GitHub Marketplace
  • Excel设置生日自动智能提醒,公式可直接套用!
  • 同步异步日志系统:前置知识
  • 微服务设计原则——功能设计
  • 低代码软件搭建自学的第一天——熟悉PyQt
  • 基于Python3编写的Golang程序多平台交叉编译自动化脚本
  • 远程桌面连接
  • 网络地址转换NAT
  • 什么是CRM管理软件?CRM的基本概念、功能、选择标准、应用场景
  • Python编程常用的19个经典案例
  • 【Unity基础】AudioSource 常用方法总结
  • CSS系列(25)-- 滚动优化详解
  • CST天线设计的六大核心特点:为天线分析提供完整解决方案!
  • Ubuntu下C语言操作kafka示例
  • 怎么将pdf中的某一个提取出来?介绍几种提取PDF中页面的方法
  • HTTP接口报错详解与解决 200,500,403,408,404
  • 监控IP频繁登录服务器脚本
  • 分布式链路追踪-03-Jaeger、Zipkin、skywalking 中的 span 是如何设计的?
  • 【达梦数据库】获取对象DDL
  • InnoDB和MyISAM引擎优缺点和区别
  • 文件上传知识点汇总
  • 计算机网络技术基础:5.数据通信系统
  • 光谱相机在农业的应用