当前位置: 首页 > news >正文

AQS(AbstractQueuedSynchronizer)抽象队列同步器

AQS(AbstractQueuedSynchronizer)抽象队列同步器

是一种用来构建锁和同步器的框架


基本构成

state

  • 维护共享资源的持有状态
  • 通常:
    • 0 表示共享资源可使用
    • 1 表示共享资源正在使用

exclusiveOwnerThread

  • 继承了一个抽象类 AbstractOwnableSynchronizer
  • 记录当前持有共享资源的线程

等待队列

  • 维护所有未获取到锁而陷入等待的线程
存储结构
  • 双向链表
  • 头节点是一个占位的无意义节点
  • 每个节点保存了陷入等待的线程和当前节点的状态

底层支持

UnSafe

  • 支持 CAS 来保证 state 的并发安全

LockSupport

  • 支持线程的等待和唤醒

内置方法

acquire

  • 以 CAS 的方式获取锁
  • 在获取失败时进入等待队列

release

  • 释放掉当前线程持有的锁
  • 并唤醒位于当前节点的后继节点的线程
    • 如果该后继节点状态非正常(被取消),则从后往前找到一个状态正常的

acquireShared

  • 以共享的方式获取锁
  • 在获取失败时进入等待队列

releaseShared

  • 做一次释放(共享资源不一定被完全释放)
  • 当共享资源完全释放时唤醒所有等待线程

核心方法

addWaiter

加入等待队列

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}

acquireQueued

进入排队,在获取锁失败后,使当前线程在队列中等待,并且在此期间响应中断或再次尝试获取锁。

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

模版方法(由子类实现)

tryAcquire

  • 以独占的方式获取锁
  • 如果获取成功返回 true,失败返回 false
  • 具体获取方式由子类实现

tryRelease

  • 以独占的方式释放锁
  • 如果成功释放掉锁返回 true,失败返回 false
  • 具体释放方式由子类实现

tryAcquireShared

  • 以共享的方式获取锁
  • 如果返回值为负数表示获取锁失败,陷入等待
  • 大于等于 0 表示获取锁成功

tryReleaseShared

  • 以共享的方式释放锁
  • 如果共享资源完成释放返回 true
  • 处于等待的线程将会被唤醒
  • 具体释放方式由子类决定

Condition

Condition 是 AQS 提供的一个接口,位于包:java.util.concurrent.locks.Condition

它允许一个或多个线程在某个条件不满足时挂起等待,并在其他线程改变状态后被唤醒。这与传统的 Object.wait() / Object.notify() 类似,但它是与 Lock 配合使用的条件变量

await

进入 Condition 的等待队列,当前线程陷入等待

public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}

signal

从 Condition 的等待队列移除第一个元素,绑定的线程恢复执行

public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}

JDK 中的应用

ReentrantLock

非公平锁实现
lock 方法
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}

大致过程

  • 首先使用 CAS 尝试获取共享资源(如果刚好空闲就能成功)
  • 成功则将当前线程设置为资源持有者
  • 否则再次使用 CAS 尝试获取(支持重入)
  • 失败则将当前线程加入等待队列
  • 使用 park 方式使当前线程陷入等待
公平锁实现
lock 方法
final void lock() {acquire(1);
}

基本和非公平锁相同,主要区别有只有一个:

  • 在尝试获取共享资源的时候,获取检查等待队列中是否存在其他等待的线程
  • 如果有就放弃获取锁(也不会一上来就去做 CAS)

释放锁

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}

比较简单

  • 判断当前锁的状态,如果持有者是当前线程
  • 将状态变更为可使用
  • 并唤醒后继节点

CountDownLatch

是一种同步辅助类,用于控制一个或多个线程等待其他线程完成操作,然后再继续执行

countdown
public void countDown() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}
}

state 减 1,如果 state = 0,表示共享资源被完全释放,而后等待队列中的所有线程被唤醒

await
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}

state = 0 时才能获取成功,否则失败,如果获取失败陷入等待


Semaphore

信号量是一个非负整数,获取信号量的任务都会将该整数减 1,当为 0 时,所有试图获取该信号量的任务都处于阻塞状态,正值表示有一个或者多个释放信号量操作。

acquire
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}

当 Semaphore 中的 permit 大于零时表示共享资源可获取,否则获取时失败,加入等待队列。

PS:实际上 Semaphore 也和 ReentrantLock 一样有公平和非公平的实现,默认也是非公平,使用公平时会检查等待队列中是否由处于等待中的前驱节点

release
public void release() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}

调用 Semaphore 的 release 时将会使 permit 值加 1,表示对共享资源的持有释放,并尝试唤醒一个等待中的线程

http://www.lryc.cn/news/590317.html

相关文章:

  • 开源Web播放器推荐与选型指南
  • 开源一体化协作平台Colanode
  • uniapp小程序实现地图多个标记点
  • 数据结构与算法学习(一)
  • Java大厂面试实录:从Spring Boot到AI微服务架构的全栈挑战
  • PyCharm高效入门指南大纲
  • 图机器学习(8)——经典监督图嵌入算法
  • 浅析BLE/MQTT协议的区别
  • Web3.0与元宇宙:重构数字文明的技术范式与社会变革
  • 创客匠人解析:系统化工具如何重构知识变现效率
  • AI Agent:重构智能边界的终极形态——从技术内核到未来图景全景解析
  • UDP和TCP的主要区别是什么?
  • 智能呼叫中心系统:重构客户服务的核心引擎
  • 【保姆级喂饭教程】Idea中配置类注释模板
  • C++---emplace_back与push_back
  • Java接口:小白如何初步认识Java接口?
  • C语言 个人总结1
  • 【SF顺丰】顺丰开放平台API对接(Java对接篇)
  • AI Agent开发学习系列 - langchain之LCEL(2):LCEL 链式表达解析
  • Nand2Tetris(计算机系统要素)学习笔记 Project 0
  • 单片机学习笔记.IIC通信协议(根据数据手册写IIC驱动程序,这里以普中开发板上的AT24C02为例)
  • 【深度学习基础】PyTorch中model.eval()与with torch.no_grad()以及detach的区别与联系?
  • 嵌入式学习-PyTorch(5)-day22
  • 人工智能时代下的数据新职业:新兴工作岗位版图研究
  • 智能体架构深度解构:一次用户请求的完整旅程
  • 第二十一 篇 PDF文档自动化:Python一键合并、分割、水印、提取与加密解密!你的PDF全能管家!
  • audiorecord 之 抢占优先级
  • rLLM:用于LLM Agent RL后训练的创新框架
  • ESP32 S3 基于 Arduino 实现局域网视频流传输全解析
  • Python从入门到高手9.2节-Python字典的操作方法