十三、抽象队列同步器AQS
1、AQS简介
AQS是AbstractQueuedSynchronizer
的简称,也即抽象队列同步器
,从字面来理解:
- 抽象:是一个抽象类,仅实现一些主要逻辑,有些方法会交由子类来实现
- 队列:采取队列(FIFO,先进先出)这种数据结构存储数据
- 同步:实现了多线程环境下的同步操作
那AQS有什么用呢?AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如我们提到的ReentrantLock
,Semaphore
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
等等皆是基于AQS的。
当然,我们自己也能利用AQS来定制符合我们自己需求的同步器,只要实现它的几个protected方法就可以了,在下文会有详细的介绍。
2、AQS的数据结构
AQS内部使用一个volatile
关键字修饰的变量state来作为资源的标识符。
/*** The synchronization state.*/
private volatile int state;
同时定义了几个获取和设置state的原子方法:
/*** Returns the current value of synchronization state.* This operation has memory semantics of a {@code volatile} read.* @return current state value*/
protected final int getState() {return state;
}/*** Sets the value of synchronization state.* This operation has memory semantics of a {@code volatile} write.* @param newState the new state value*/
protected final void setState(int newState) {state = newState;
}/*** Atomically sets synchronization state to the given updated* value if the current state value equals the expected value.* This operation has memory semantics of a {@code volatile} read* and write.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that the actual* value was not equal to the expected value.*/
protected final boolean compareAndSetState(int expect, int update) {return U.compareAndSetInt(this, STATE, expect, update);
}
这三种操作均是原子操作,其中compareAndSetState()
方法是依赖于UnSafe
类的compareAndSetInt()
方法。
AQS内部使用了一个先进先出(FIFO)的双端队列,并使用两个指针head
和tail
分别代表队列的头节点和尾节点。其数据结构如下图所示:
AQS的队列不直接存储线程,而是队列中每一个节点Node来具体存储线程。
AQS源码中关于节点Node类的描述:
3、AQS的Node节点
资源有两种共享模式,或者说两种同步方式:
- 独占模式(Exclusive):资源是独占的,一次只能被一个线程访问
- 共享模式(Share):资源是共享的,可以被多个线程同时访问。具体的资源个数可以通过参数指定,如
CountDownLatch
、Semaphore
一般情况下,子类只需要根据需求实现其中一种模式,但也有两种模式都实现的同步类,比如ReadWriteLock
.
AQS中关于这两种模式的源码全部都在Node
这个内部类中,源码如下:
static final class Node {// 标记节点,不包含实际的线程信息,主要用做标识符来区分共享同步模式static final Node SHARED = new Node();// 标记节点,不包含实际的线程信息,主要用做标识符来区分独占同步模式static final Node EXCLUSIVE = null;// waitStatus的值,表示该节点(对应的线程)已经被取消static final int CANCELLED = 1;// waitStatus的值,表示后继结点(对应的线程)需要被唤醒static final int SIGNAL = -1;// waitStatus的值,表示该节点(对应的线程)在等待某种条件static final int CONDITION = -2;/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/static final int PROPAGATE = -3;// 等待状态,取值范围:-3、-2、-1、0、1volatile int waitStatus;volatile Node prev; // 前驱节点volatile Node next; // 后继结点volatile Thread thread; // 节点对应的线程Node nextWaiter; // 等待队列里下一个等待条件的节点// 判断共享模式的方法final boolean isShared() {return nextWaiter == SHARED;}// 获取后继节点的方法final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}
注意:通过Node我们可以实现两个队列,一是通过prev和next指针实现的CLH队列(线程同步队列,双向队列),二是nextWaiter实现Condition条件上的等待队列(单向队列),这个Condition主要用在
ReentrantLock
类中。
4、AQS源码解析
AQS的设计是基于模板方法设计模式,一些方法不做具体实现,抛出异常,业务逻辑交由子类做具体实现。这些方法主要是:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
父类方法不做具体实现,直接抛出异常:
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
这里父类使用protected来修饰而不是抽象方法,这样做的目的是避免子类要把所有方法都重写一遍,增加了很多的工作量,子类只需要重写自己需要的方法。
而AQS实现了一系列的主要逻辑,下面AQS的源码剖析获取资源和释放资源的主要逻辑。
4.1、获取资源
acquire(int arg)
方法是获取资源的入口,arg参数表示获取资源的个数,在独占模式下,arg始终为1。下面是这个方法的源码:
public final void acquire(int arg) {// 尝试获取资源,成功返回true,失败falseif (!tryAcquire(arg) &&// 走到这里,说明获取资源失败。调用addWaiter方法将当前线程加入到等待队列acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 中断当前线程selfInterrupt();
}
首先调用tryAcquire(arg)
方法尝试获取资源,前面也提到了,这个方法的逻辑是交由子类来具体实现。如果获取资源失败,就通过addWaiter(Node.EXCLUSIVE), arg)
方法将当前线程加入到等待队列中,采用独占模式。这个方法的具体实现如下:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 快速入队Node pred = tail;if (pred != null) {node.prev = pred;// CAS操作将当前节点设置为新的尾节点(可能会失败)if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果快速入队失败,再走完整入队enq(node);return node;
}private Node enq(final Node node) {// 这里通过自旋的方式,确保CAS操作一定正确完成for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
addWaiter
方法先尝试能否快速入队,如果失败了,再通过完整入队的方式,将当前线程加入到等待队列。这样做的目的是,保证在线程安全的情况下提高性能。
ok,上面方法介绍完了,让我们回到最初的acquire(int arg)
方法,当获取资源失败,并且将当前线程添加到等待队列的队尾。然后我们来看看AQS最后要做的事情是什么呢?我们来看看最后一个方法acquireQueued(final Node node, int arg)
,源码如下:
// node节点是当前获取资源失败的节点
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 当前线程阻塞是否被中断boolean interrupted = false;// 自旋操作for (;;) {// 前驱节点final Node p = node.predecessor();// 前驱节点是头节点,说明当前节点就是等待队列中第一个等待节点,可以尝试获取资源if (p == head && tryAcquire(arg)) {// 如果成功,设置当前节点为新的头节点setHead(node);// 回收旧的头节点p.next = null; // help GC// 失败标记置为falsefailed = false;// 返回是否中断标记return interrupted;}// 如果当前节点不是首个等待节点,判断是否应该阻塞。if (shouldParkAfterFailedAcquire(p, node) &&// 且判断是否应该阻塞当前线程,如果需要阻塞,调用parkAndCheckInterrupt()方法进行阻塞// 线程唤醒后,继续下一次循环parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 如果失败,将当前线程的状态设置为 CANCELLED,等待GC回收cancelAcquire(node);}
}
AQS将获取资源失败的线程成功添加到等待队列后,反复尝试获取锁,如果获取不到就阻塞(挂起),直到获取锁成功或阻塞中断。
上述流程就是独占方式获取资源的全部执行流程了。
这里parkAndCheckInterrupt方法内部使用到了LockSupport.park(this),顺便简单介绍一下park。
LockSupport类是Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:
- park(boolean isAbsolute, long time):阻塞当前线程
- unpark(Thread jthread):唤醒指定的线程
现在用一张流程图总结上述过程:
4.2、释放资源
释放资源的逻辑比较简单,源码如下:
// 释放资源的主入口
public final boolean release(int arg) {// 尝试释放资源,具体的逻辑由子类实现if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// 唤醒后继节点unparkSuccessor(h);return true;}return false;
}private void unparkSuccessor(Node node) {// 头节点的状态如果小于0,尝试设置为0int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 后继节点Node s = node.next;// 如果后继节点不存在或者状态大于0(大于0表示线程已被取消),从尾部向前遍历找到队列中第一个待唤醒的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果待唤醒的后继节点存在,唤醒该节点对应的线程。if (s != null)LockSupport.unpark(s.thread);
}
5、小结
AQS是一个用来构建锁和同步器的框架,使用AQS能够很方便的构造出我们需要定制化的同步器,而且我们耳熟能详的并发包组件ReentrantLock
,Semaphore
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
等等皆是基于AQS实现的。
下面是一个示例(互斥锁,同一时刻,只允许一个线程获取):
import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class Mutex {// 自定义内部类实现AQSprivate static class Sync extends AbstractQueuedSynchronizer {// 获取资源(独占模式)@Overrideprotected boolean tryAcquire(int arg) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 释放资源(独占模式)@Overrideprotected boolean tryRelease(int arg) {if (getState() == 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}// 如果当前线程以独占方式获取资源,返回true@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}}private final Sync sync = new Sync();// 加锁public void lock() {sync.acquire(1);}// 释放锁public void unlock() {sync.release(1);}// 资源是否被占有public boolean isLocked() {return sync.isHeldExclusively();}
}