AQS(AbstractQueuedSynchronizer)底层源码实现与设计思想
深入解析AbstractQueuedSynchronizer(AQS)底层源码实现与设计思想
在Java并发编程领域,AbstractQueuedSynchronizer
(简称AQS)是构建多种同步组件的基础框架,比如ReentrantLock
、Semaphore
、CountDownLatch
等。它通过统一的队列和状态管理机制,实现了多线程间的高效同步控制。本文将带你深入理解AQS的底层实现原理、核心设计思想以及关键源码剖析。
一、什么是AbstractQueuedSynchronizer?
AQS是Java中一个抽象同步器框架,利用一个int
类型的状态变量表示同步状态,并通过一个基于FIFO的双向队列来管理因获取同步状态失败而等待的线程。AQS为我们封装了复杂的线程排队和唤醒逻辑,子类只需关注如何修改同步状态,实现自定义同步器。
二、AQS的核心设计思想
-
状态变量
state
管理同步状态
用一个整型变量表示同步状态(如锁的占用次数、剩余许可数等),通过CAS(Compare-And-Swap)操作确保线程安全地修改状态。 -
等待队列维护线程排队
获取同步状态失败的线程被封装为Node
节点,加入等待队列,保证线程的公平等待和唤醒顺序。 -
独占模式与共享模式
支持独占锁(如ReentrantLock
)和共享锁(如Semaphore
)两种获取模式,灵活实现不同的同步策略。 -
模板方法设计模式
抽象出tryAcquire
、tryRelease
、tryAcquireShared
、tryReleaseShared
四个方法,由子类实现具体同步逻辑,AQS负责统一排队和阻塞唤醒机制。 -
常用具体方法:acquire
三、关键数据结构详解
private volatile int state; // 同步状态
private transient volatile Node head; // 等待队列头节点
private transient volatile Node tail; // 等待队列尾节点static final class Node {static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter; // 用于Condition队列static final Node SHARED = new Node();static final Node EXCLUSIVE = null;// 构造函数省略
}
state
:表示同步器当前状态,具体含义由子类定义。head
和tail
:管理等待线程的FIFO双向链表,保证有序唤醒。Node
结构:表示队列节点,保存线程、前后节点、等待状态(如取消、等待唤醒、条件等待等)信息。
四、获取锁的核心流程(以独占模式为例)
-
尝试获取同步状态
调用tryAcquire(arg)
:通过子类实现。尝试无阻塞获取锁,成功则直接返回。 -
入队排队等待
如果失败,调用acquire(方法)
addWaiter(Node.EXCLUSIVE)将当前线程封装为独占模式节点加入队尾。 -
阻塞线程
通过acquireQueued
方法让线程在队列中自旋等待,并调用LockSupport.park(this)
阻塞线程。 -
被唤醒后重试获取锁
唤醒的线程再次调用tryAcquire
,成功则退出阻塞。 -
释放锁时唤醒后继线程
释放锁调用tryRelease
,成功释放后调用unparkSuccessor
唤醒队列头节点的后继线程。
核心方法如下:
public final void acquire(int arg) {if (!tryAcquire(arg))acquire(null, arg, false, false, false, 0L);}
final int acquire(Node node, int arg, boolean shared,boolean interruptible, boolean timed, long time) {Thread current = Thread.currentThread();byte spins = 0, postSpins = 0; // retries upon unpark of first threadboolean interrupted = false, first = false;Node pred = null; // predecessor of node when enqueued/** Repeatedly:* Check if node now first* if so, ensure head stable, else ensure valid predecessor* if node is first or not yet enqueued, try acquiring* else if node not yet created, create it* else if not yet enqueued, try once to enqueue* else if woken from park, retry (up to postSpins times)* else if WAITING status not set, set and retry* else park and clear WAITING status, and check cancellation*/for (;;) {if (!first && (pred = (node == null) ? null : node.prev) != null &&!(first = (head == pred))) {if (pred.status < 0) {cleanQueue(); // predecessor cancelledcontinue;} else if (pred.prev == null) {Thread.onSpinWait(); // ensure serializationcontinue;}}if (first || pred == null) {boolean acquired;try {if (shared)acquired = (tryAcquireShared(arg) >= 0);elseacquired = tryAcquire(arg);} catch (Throwable ex) {cancelAcquire(node, interrupted, false);throw ex;}if (acquired) {if (first) {node.prev = null;head = node;pred.next = null;node.waiter = null;if (shared)signalNextIfShared(node);if (interrupted)current.interrupt();}return 1;}}if (node == null) { // allocate; retry before enqueueif (shared)node = new SharedNode();elsenode = new ExclusiveNode();} else if (pred == null) { // try to enqueuenode.waiter = current;Node t = tail;node.setPrevRelaxed(t); // avoid unnecessary fenceif (t == null)tryInitializeHead();else if (!casTail(t, node))node.setPrevRelaxed(null); // back outelset.next = node;} else if (first && spins != 0) {--spins; // reduce unfairness on rewaitsThread.onSpinWait();} else if (node.status == 0) {node.status = WAITING; // enable signal and recheck} else {long nanos;spins = postSpins = (byte)((postSpins << 1) | 1);if (!timed)LockSupport.park(this);else if ((nanos = time - System.nanoTime()) > 0L)LockSupport.parkNanos(this, nanos);elsebreak;node.clearStatus();if ((interrupted |= Thread.interrupted()) && interruptible)break;}}return cancelAcquire(node, interrupted, interruptible);}
五、等待队列的入队与唤醒机制
-
入队(
addWaiter
)
优先尝试直接加入尾部,如果失败调用enq
方法自旋入队,保证线程安全。 -
阻塞与唤醒
线程阻塞通过LockSupport.park
实现,释放同步状态时通过LockSupport.unpark
唤醒队列头后继节点。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
六、共享模式与独占模式的区别
-
独占模式
同一时刻只有一个线程能获取同步状态,如ReentrantLock
。 -
共享模式
多个线程可以同时获取同步状态,如Semaphore
允许多个许可。
AQS通过Node
的nextWaiter
标识和唤醒策略区分两种模式,保证共享模式下多个线程可以唤醒。
七、AQS源码亮点总结
核心点 | 说明 |
---|---|
state 变量 | 用于表示同步资源状态,CAS操作保证原子性 |
FIFO等待队列 | 基于双向链表,维护线程的公平等待和唤醒顺序 |
独占与共享模式 | 支持多种同步策略,扩展性强 |
模板方法设计 | 子类实现具体同步逻辑,AQS负责线程排队和阻塞管理 |
LockSupport | 高效的线程挂起与唤醒机制 |
八、总结
AbstractQueuedSynchronizer是Java并发框架中最重要的基础组件之一。它以同步状态为核心,结合FIFO等待队列和线程阻塞/唤醒机制,实现了灵活且高效的同步策略。深入理解AQS不仅有助于掌握JUC中的各种同步工具,还能启发我们设计高性能自定义同步器。