当前位置: 首页 > news >正文

AQS(AbstractQueuedSynchronizer)底层源码实现与设计思想

深入解析AbstractQueuedSynchronizer(AQS)底层源码实现与设计思想

在Java并发编程领域,AbstractQueuedSynchronizer(简称AQS)是构建多种同步组件的基础框架,比如ReentrantLockSemaphoreCountDownLatch等。它通过统一的队列和状态管理机制,实现了多线程间的高效同步控制。本文将带你深入理解AQS的底层实现原理、核心设计思想以及关键源码剖析。


一、什么是AbstractQueuedSynchronizer?

AQS是Java中一个抽象同步器框架,利用一个int类型的状态变量表示同步状态,并通过一个基于FIFO的双向队列来管理因获取同步状态失败而等待的线程。AQS为我们封装了复杂的线程排队和唤醒逻辑,子类只需关注如何修改同步状态,实现自定义同步器。


二、AQS的核心设计思想

  • 状态变量state管理同步状态
    用一个整型变量表示同步状态(如锁的占用次数、剩余许可数等),通过CAS(Compare-And-Swap)操作确保线程安全地修改状态。

  • 等待队列维护线程排队
    获取同步状态失败的线程被封装为Node节点,加入等待队列,保证线程的公平等待和唤醒顺序。

  • 独占模式与共享模式
    支持独占锁(如ReentrantLock)和共享锁(如Semaphore)两种获取模式,灵活实现不同的同步策略。

  • 模板方法设计模式
    抽象出tryAcquiretryReleasetryAcquireSharedtryReleaseShared四个方法,由子类实现具体同步逻辑,AQS负责统一排队和阻塞唤醒机制。

  • 常用具体方法:acquire


三、关键数据结构详解

private volatile int state;            // 同步状态
private transient volatile Node head;  // 等待队列头节点
private transient volatile Node tail;  // 等待队列尾节点static final class Node {static final int CANCELLED =  1;static final int SIGNAL    = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;  // 用于Condition队列static final Node SHARED = new Node();static final Node EXCLUSIVE = null;// 构造函数省略
}
  • state:表示同步器当前状态,具体含义由子类定义。
  • headtail:管理等待线程的FIFO双向链表,保证有序唤醒。
  • Node结构:表示队列节点,保存线程、前后节点、等待状态(如取消、等待唤醒、条件等待等)信息。

四、获取锁的核心流程(以独占模式为例)

  1. 尝试获取同步状态
    调用tryAcquire(arg):通过子类实现。尝试无阻塞获取锁,成功则直接返回。

  2. 入队排队等待
    如果失败,调用acquire(方法)addWaiter(Node.EXCLUSIVE)将当前线程封装为独占模式节点加入队尾。

  3. 阻塞线程
    通过acquireQueued方法让线程在队列中自旋等待,并调用LockSupport.park(this)阻塞线程。

  4. 被唤醒后重试获取锁
    唤醒的线程再次调用tryAcquire,成功则退出阻塞。

  5. 释放锁时唤醒后继线程
    释放锁调用tryRelease,成功释放后调用unparkSuccessor唤醒队列头节点的后继线程。

核心方法如下:

public final void acquire(int arg) {if (!tryAcquire(arg))acquire(null, arg, false, false, false, 0L);}
   final int acquire(Node node, int arg, boolean shared,boolean interruptible, boolean timed, long time) {Thread current = Thread.currentThread();byte spins = 0, postSpins = 0;   // retries upon unpark of first threadboolean interrupted = false, first = false;Node pred = null;                // predecessor of node when enqueued/** Repeatedly:*  Check if node now first*    if so, ensure head stable, else ensure valid predecessor*  if node is first or not yet enqueued, try acquiring*  else if node not yet created, create it*  else if not yet enqueued, try once to enqueue*  else if woken from park, retry (up to postSpins times)*  else if WAITING status not set, set and retry*  else park and clear WAITING status, and check cancellation*/for (;;) {if (!first && (pred = (node == null) ? null : node.prev) != null &&!(first = (head == pred))) {if (pred.status < 0) {cleanQueue();           // predecessor cancelledcontinue;} else if (pred.prev == null) {Thread.onSpinWait();    // ensure serializationcontinue;}}if (first || pred == null) {boolean acquired;try {if (shared)acquired = (tryAcquireShared(arg) >= 0);elseacquired = tryAcquire(arg);} catch (Throwable ex) {cancelAcquire(node, interrupted, false);throw ex;}if (acquired) {if (first) {node.prev = null;head = node;pred.next = null;node.waiter = null;if (shared)signalNextIfShared(node);if (interrupted)current.interrupt();}return 1;}}if (node == null) {                 // allocate; retry before enqueueif (shared)node = new SharedNode();elsenode = new ExclusiveNode();} else if (pred == null) {          // try to enqueuenode.waiter = current;Node t = tail;node.setPrevRelaxed(t);         // avoid unnecessary fenceif (t == null)tryInitializeHead();else if (!casTail(t, node))node.setPrevRelaxed(null);  // back outelset.next = node;} else if (first && spins != 0) {--spins;                        // reduce unfairness on rewaitsThread.onSpinWait();} else if (node.status == 0) {node.status = WAITING;          // enable signal and recheck} else {long nanos;spins = postSpins = (byte)((postSpins << 1) | 1);if (!timed)LockSupport.park(this);else if ((nanos = time - System.nanoTime()) > 0L)LockSupport.parkNanos(this, nanos);elsebreak;node.clearStatus();if ((interrupted |= Thread.interrupted()) && interruptible)break;}}return cancelAcquire(node, interrupted, interruptible);}

五、等待队列的入队与唤醒机制

  • 入队(addWaiter
    优先尝试直接加入尾部,如果失败调用enq方法自旋入队,保证线程安全。

  • 阻塞与唤醒
    线程阻塞通过LockSupport.park实现,释放同步状态时通过LockSupport.unpark唤醒队列头后继节点。

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}

六、共享模式与独占模式的区别

  • 独占模式
    同一时刻只有一个线程能获取同步状态,如ReentrantLock

  • 共享模式
    多个线程可以同时获取同步状态,如Semaphore允许多个许可。

AQS通过NodenextWaiter标识和唤醒策略区分两种模式,保证共享模式下多个线程可以唤醒。


七、AQS源码亮点总结

核心点说明
state变量用于表示同步资源状态,CAS操作保证原子性
FIFO等待队列基于双向链表,维护线程的公平等待和唤醒顺序
独占与共享模式支持多种同步策略,扩展性强
模板方法设计子类实现具体同步逻辑,AQS负责线程排队和阻塞管理
LockSupport高效的线程挂起与唤醒机制

八、总结

AbstractQueuedSynchronizer是Java并发框架中最重要的基础组件之一。它以同步状态为核心,结合FIFO等待队列和线程阻塞/唤醒机制,实现了灵活且高效的同步策略。深入理解AQS不仅有助于掌握JUC中的各种同步工具,还能启发我们设计高性能自定义同步器。

http://www.lryc.cn/news/616246.html

相关文章:

  • 前端路由:Hash 模式与 History 模式深度解析
  • Java Stream流详解:从基础语法到实战应用
  • Rust 实战四 | Traui2+Vue3+Rspack 开发桌面应用:通配符掩码计算器
  • 【算法题】:和为N的连续正数序列
  • 数学建模:控制预测类问题
  • Python 获取对象信息的所有方法
  • matlab实现随机森林算法
  • Doubletrouble靶机练习
  • 点击速度测试:一款放大操作差距的互动挑战游戏
  • #Datawhale AI夏令营#第三期全球AI攻防挑战赛(AIGC技术-图像方向)
  • 如何用分析方法解决工作中的问题?
  • 检索召回率优化探究五(BGE-M3 混合检索):基于LangChain0.3 集成Milvu2.5 向量数据库构建的智能问答系统
  • Linux系统编程Day11 -- 进程属性和常见进程
  • 学习模板元编程(3)enable_if
  • 【树状数组】Dynamic Range Sum Queries
  • [激光原理与应用-221]:设计 - 皮秒紫外激光器 - 常见技术难题、原因与解决方案
  • Linux-静态配置ip地址
  • 【无标题】命名管道(Named Pipe)是一种在操作系统中用于**进程间通信(IPC)** 的机制
  • 深度解析Linux设备树(DTS):设计原理、实现框架与实例分析
  • 基于Qt/QML 5.14和YOLOv8的工业异常检测Demo:冲压点智能识别
  • 线程池的核心线程数与最大线程数怎么设置
  • 基于FFmpeg的B站视频下载处理
  • 简要介绍交叉编译工具arm-none-eabi、arm-linux-gnueabi与arm-linux-gnueabihf
  • 【iOS】JSONModel源码学习
  • 2025.8.10总结
  • mpv core_thread pipeline
  • 第16届蓝桥杯Scratch选拔赛初级及中级(STEMA)2025年4月13日真题
  • ARM保留的标准中断处理程序入口和外设中断处理程序入口介绍
  • Python设计模式 - 装饰模式
  • 双亲委派机制是什么?