AQS(AbstractQueuedSynchronizer)抽象队列同步器
AQS(AbstractQueuedSynchronizer)抽象队列同步器
是一种用来构建锁和同步器的框架
基本构成
state
- 维护共享资源的持有状态
- 通常:
0
表示共享资源可使用1
表示共享资源正在使用
exclusiveOwnerThread
- 继承了一个抽象类
AbstractOwnableSynchronizer
- 记录当前持有共享资源的线程
等待队列
- 维护所有未获取到锁而陷入等待的线程
存储结构
- 双向链表
- 头节点是一个占位的无意义节点
- 每个节点保存了陷入等待的线程和当前节点的状态
底层支持
UnSafe
- 支持 CAS 来保证
state
的并发安全
LockSupport
- 支持线程的等待和唤醒
内置方法
acquire
- 以 CAS 的方式获取锁
- 在获取失败时进入等待队列
release
- 释放掉当前线程持有的锁
- 并唤醒位于当前节点的后继节点的线程
- 如果该后继节点状态非正常(被取消),则从后往前找到一个状态正常的
acquireShared
- 以共享的方式获取锁
- 在获取失败时进入等待队列
releaseShared
- 做一次释放(共享资源不一定被完全释放)
- 当共享资源完全释放时唤醒所有等待线程
核心方法
addWaiter
加入等待队列
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
acquireQueued
进入排队,在获取锁失败后,使当前线程在队列中等待,并且在此期间响应中断或再次尝试获取锁。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
模版方法(由子类实现)
tryAcquire
- 以独占的方式获取锁
- 如果获取成功返回
true
,失败返回false
- 具体获取方式由子类实现
tryRelease
- 以独占的方式释放锁
- 如果成功释放掉锁返回
true
,失败返回false
- 具体释放方式由子类实现
tryAcquireShared
- 以共享的方式获取锁
- 如果返回值为负数表示获取锁失败,陷入等待
- 大于等于 0 表示获取锁成功
tryReleaseShared
- 以共享的方式释放锁
- 如果共享资源完成释放返回
true
- 处于等待的线程将会被唤醒
- 具体释放方式由子类决定
Condition
Condition
是 AQS 提供的一个接口,位于包:java.util.concurrent.locks.Condition
它允许一个或多个线程在某个条件不满足时挂起等待,并在其他线程改变状态后被唤醒。这与传统的 Object.wait()
/ Object.notify()
类似,但它是与 Lock 配合使用的条件变量。
await
进入 Condition 的等待队列,当前线程陷入等待
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
signal
从 Condition 的等待队列移除第一个元素,绑定的线程恢复执行
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}
JDK 中的应用
ReentrantLock
非公平锁实现
lock 方法
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
大致过程:
- 首先使用 CAS 尝试获取共享资源(如果刚好空闲就能成功)
- 成功则将当前线程设置为资源持有者
- 否则再次使用 CAS 尝试获取(支持重入)
- 失败则将当前线程加入等待队列
- 使用
park
方式使当前线程陷入等待
公平锁实现
lock 方法
final void lock() {acquire(1);
}
基本和非公平锁相同,主要区别有只有一个:
- 在尝试获取共享资源的时候,获取检查等待队列中是否存在其他等待的线程
- 如果有就放弃获取锁(也不会一上来就去做 CAS)
释放锁
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
比较简单:
- 判断当前锁的状态,如果持有者是当前线程
- 将状态变更为可使用
- 并唤醒后继节点
CountDownLatch
是一种同步辅助类,用于控制一个或多个线程等待其他线程完成操作,然后再继续执行
countdown
public void countDown() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}
}
将
state
减 1,如果state = 0
,表示共享资源被完全释放,而后等待队列中的所有线程被唤醒
await
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
当
state = 0
时才能获取成功,否则失败,如果获取失败陷入等待
Semaphore
信号量是一个非负整数,获取信号量的任务都会将该整数减 1,当为 0 时,所有试图获取该信号量的任务都处于阻塞状态,正值表示有一个或者多个释放信号量操作。
acquire
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}
当 Semaphore 中的 permit 大于零时表示共享资源可获取,否则获取时失败,加入等待队列。
PS:实际上 Semaphore 也和 ReentrantLock 一样有公平和非公平的实现,默认也是非公平,使用公平时会检查等待队列中是否由处于等待中的前驱节点
release
public void release() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}
调用 Semaphore 的 release 时将会使 permit 值加 1,表示对共享资源的持有释放,并尝试唤醒一个等待中的线程