当前位置: 首页 > news >正文

十三、抽象队列同步器AQS

1、AQS简介

AQS是AbstractQueuedSynchronizer的简称,也即抽象队列同步器,从字面来理解:

  • 抽象:是一个抽象类,仅实现一些主要逻辑,有些方法会交由子类来实现
  • 队列:采取队列(FIFO,先进先出)这种数据结构存储数据
  • 同步:实现了多线程环境下的同步操作

那AQS有什么用呢?AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如我们提到的ReentrantLockSemaphoreReentrantReadWriteLockSynchronousQueueFutureTask等等皆是基于AQS的。

当然,我们自己也能利用AQS来定制符合我们自己需求的同步器,只要实现它的几个protected方法就可以了,在下文会有详细的介绍。

2、AQS的数据结构

AQS内部使用一个volatile关键字修饰的变量state来作为资源的标识符。

/*** The synchronization state.*/
private volatile int state;

同时定义了几个获取和设置state的原子方法:

/*** Returns the current value of synchronization state.* This operation has memory semantics of a {@code volatile} read.* @return current state value*/
protected final int getState() {return state;
}/*** Sets the value of synchronization state.* This operation has memory semantics of a {@code volatile} write.* @param newState the new state value*/
protected final void setState(int newState) {state = newState;
}/*** Atomically sets synchronization state to the given updated* value if the current state value equals the expected value.* This operation has memory semantics of a {@code volatile} read* and write.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that the actual*         value was not equal to the expected value.*/
protected final boolean compareAndSetState(int expect, int update) {return U.compareAndSetInt(this, STATE, expect, update);
}

这三种操作均是原子操作,其中compareAndSetState()方法是依赖于UnSafe类的compareAndSetInt()方法。

AQS内部使用了一个先进先出(FIFO)的双端队列,并使用两个指针headtail分别代表队列的头节点和尾节点。其数据结构如下图所示:

AQS的队列不直接存储线程,而是队列中每一个节点Node来具体存储线程。

AQS源码中关于节点Node类的描述:

3、AQS的Node节点

资源有两种共享模式,或者说两种同步方式:

  • 独占模式(Exclusive):资源是独占的,一次只能被一个线程访问
  • 共享模式(Share):资源是共享的,可以被多个线程同时访问。具体的资源个数可以通过参数指定,如CountDownLatchSemaphore

一般情况下,子类只需要根据需求实现其中一种模式,但也有两种模式都实现的同步类,比如ReadWriteLock.

AQS中关于这两种模式的源码全部都在Node这个内部类中,源码如下:

static final class Node {// 标记节点,不包含实际的线程信息,主要用做标识符来区分共享同步模式static final Node SHARED = new Node();// 标记节点,不包含实际的线程信息,主要用做标识符来区分独占同步模式static final Node EXCLUSIVE = null;// waitStatus的值,表示该节点(对应的线程)已经被取消static final int CANCELLED =  1;// waitStatus的值,表示后继结点(对应的线程)需要被唤醒static final int SIGNAL    = -1;// waitStatus的值,表示该节点(对应的线程)在等待某种条件static final int CONDITION = -2;/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/static final int PROPAGATE = -3;// 等待状态,取值范围:-3、-2、-1、0、1volatile int waitStatus;volatile Node prev; // 前驱节点volatile Node next; // 后继结点volatile Thread thread; // 节点对应的线程Node nextWaiter; // 等待队列里下一个等待条件的节点// 判断共享模式的方法final boolean isShared() {return nextWaiter == SHARED;}// 获取后继节点的方法final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}

注意:通过Node我们可以实现两个队列,一是通过prev和next指针实现的CLH队列(线程同步队列,双向队列),二是nextWaiter实现Condition条件上的等待队列(单向队列),这个Condition主要用在ReentrantLock类中。

4、AQS源码解析

AQS的设计是基于模板方法设计模式,一些方法不做具体实现,抛出异常,业务逻辑交由子类做具体实现。这些方法主要是:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

父类方法不做具体实现,直接抛出异常:

protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}

这里父类使用protected来修饰而不是抽象方法,这样做的目的是避免子类要把所有方法都重写一遍,增加了很多的工作量,子类只需要重写自己需要的方法。

而AQS实现了一系列的主要逻辑,下面AQS的源码剖析获取资源和释放资源的主要逻辑。

4.1、获取资源

acquire(int arg)方法是获取资源的入口,arg参数表示获取资源的个数,在独占模式下,arg始终为1。下面是这个方法的源码:

public final void acquire(int arg) {// 尝试获取资源,成功返回true,失败falseif (!tryAcquire(arg) &&// 走到这里,说明获取资源失败。调用addWaiter方法将当前线程加入到等待队列acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 中断当前线程selfInterrupt();
}

首先调用tryAcquire(arg)方法尝试获取资源,前面也提到了,这个方法的逻辑是交由子类来具体实现。如果获取资源失败,就通过addWaiter(Node.EXCLUSIVE), arg)方法将当前线程加入到等待队列中,采用独占模式。这个方法的具体实现如下:

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 快速入队Node pred = tail;if (pred != null) {node.prev = pred;// CAS操作将当前节点设置为新的尾节点(可能会失败)if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果快速入队失败,再走完整入队enq(node);return node;
}private Node enq(final Node node) {// 这里通过自旋的方式,确保CAS操作一定正确完成for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

addWaiter方法先尝试能否快速入队,如果失败了,再通过完整入队的方式,将当前线程加入到等待队列。这样做的目的是,保证在线程安全的情况下提高性能。

ok,上面方法介绍完了,让我们回到最初的acquire(int arg)方法,当获取资源失败,并且将当前线程添加到等待队列的队尾。然后我们来看看AQS最后要做的事情是什么呢?我们来看看最后一个方法acquireQueued(final Node node, int arg),源码如下:

// node节点是当前获取资源失败的节点
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 当前线程阻塞是否被中断boolean interrupted = false;// 自旋操作for (;;) {// 前驱节点final Node p = node.predecessor();// 前驱节点是头节点,说明当前节点就是等待队列中第一个等待节点,可以尝试获取资源if (p == head && tryAcquire(arg)) {// 如果成功,设置当前节点为新的头节点setHead(node);// 回收旧的头节点p.next = null; // help GC// 失败标记置为falsefailed = false;// 返回是否中断标记return interrupted;}// 如果当前节点不是首个等待节点,判断是否应该阻塞。if (shouldParkAfterFailedAcquire(p, node) &&// 且判断是否应该阻塞当前线程,如果需要阻塞,调用parkAndCheckInterrupt()方法进行阻塞// 线程唤醒后,继续下一次循环parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 如果失败,将当前线程的状态设置为 CANCELLED,等待GC回收cancelAcquire(node);}
}

AQS将获取资源失败的线程成功添加到等待队列后,反复尝试获取锁,如果获取不到就阻塞(挂起),直到获取锁成功或阻塞中断。

上述流程就是独占方式获取资源的全部执行流程了。

这里parkAndCheckInterrupt方法内部使用到了LockSupport.park(this),顺便简单介绍一下park。

LockSupport类是Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

  • park(boolean isAbsolute, long time):阻塞当前线程
  • unpark(Thread jthread):唤醒指定的线程

现在用一张流程图总结上述过程:

在这里插入图片描述

4.2、释放资源

释放资源的逻辑比较简单,源码如下:

// 释放资源的主入口
public final boolean release(int arg) {// 尝试释放资源,具体的逻辑由子类实现if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// 唤醒后继节点unparkSuccessor(h);return true;}return false;
}private void unparkSuccessor(Node node) {// 头节点的状态如果小于0,尝试设置为0int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 后继节点Node s = node.next;// 如果后继节点不存在或者状态大于0(大于0表示线程已被取消),从尾部向前遍历找到队列中第一个待唤醒的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果待唤醒的后继节点存在,唤醒该节点对应的线程。if (s != null)LockSupport.unpark(s.thread);
}

5、小结

AQS是一个用来构建锁和同步器的框架,使用AQS能够很方便的构造出我们需要定制化的同步器,而且我们耳熟能详的并发包组件ReentrantLockSemaphoreReentrantReadWriteLockSynchronousQueueFutureTask等等皆是基于AQS实现的。

下面是一个示例(互斥锁,同一时刻,只允许一个线程获取):

import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class Mutex {// 自定义内部类实现AQSprivate static class Sync extends AbstractQueuedSynchronizer {// 获取资源(独占模式)@Overrideprotected boolean tryAcquire(int arg) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 释放资源(独占模式)@Overrideprotected boolean tryRelease(int arg) {if (getState() == 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}// 如果当前线程以独占方式获取资源,返回true@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}}private final Sync sync = new Sync();// 加锁public void lock() {sync.acquire(1);}// 释放锁public void unlock() {sync.release(1);}// 资源是否被占有public boolean isLocked() {return sync.isHeldExclusively();}
}
http://www.lryc.cn/news/614601.html

相关文章:

  • ClickHouse集群部署实践---3分片2副本集群
  • 【C#】掌握并发利器:深入理解 .NET 中的 Task.WhenAll
  • 宝龙地产债务化解解决方案一:基于资产代币化与轻资产转型的战略重构
  • MMBFJ310LT1G一款N沟道JFE 晶体管适用于高频放大器和振荡器等射频应用MMBFJ310LT1
  • 【vue】Vue 重要基础知识清单
  • 全面解析软件工程形式化说明技术
  • Vue 服务端渲染(SSR)详解
  • 页面tkinter
  • 初始化完数据库提示缺少server文件的处理方法
  • C 语言链表数据结构
  • 接口为什么要设计出v1和v2
  • 升级的MS9122S USB投屏控制芯片(HD输出)
  • Prometheus 通过读取文件中的配置来监控目标
  • 安科瑞EMS3.0:打造“零碳工厂”的智能能源神经中枢
  • 【Spring Boot 快速入门】八、登录认证(一)基础登录与认证校验
  • 用 “故事 + 价值观” 快速建立 IP 信任感
  • Shell脚本实现自动封禁恶意扫描IP
  • 後端開發技術教學(三) 表單提交、數據處理
  • vscode EIDE 无法编译,提示 “文件名、目录名或卷标语法不正确;
  • WPF 表格中单元格使用下拉框显示枚举属性的一种方式
  • 数据大集网:重构企业贷获客生态的线上获客新范式​
  • Ignite内部事件总线揭秘
  • Android 之 OOM的产生和解决办法
  • K-Means 聚类
  • 嵌入式第二十三课 !!!树结构与排序(时间复杂度)
  • AD布线时,如何设置线宽和线间距?简单
  • OpenAI 时隔多年再开源!GPT-OSS 120B/20B 发布,支持本地部署,消费级 GPU 即可运行
  • 五十六、【Linux系统nginx服务】nginx虚拟主机实现
  • InfluxDB 权限管理与安全加固(一)
  • leetcode热题——有效的括号