当前位置: 首页 > news >正文

【JAVA多线程】AQS,JAVA并发包的核心

目录

1.概述

1.1.什么是AQS

1.2.AQS和BlockQueue的区别

1.3.AQS的结构

2.源码分析

2.1.CLH队列

2.2.模板方法的实现

2.2.1.独占模式

1.获取资源

2.释放资源

2.2.2.共享模式


1.概述

1.1.什么是AQS

AQS非常非常重要,可以说是JAVA并发包(java.util.concurrent)的核心。

以下是一个常见的多线程场景:

当多个线程对同一个资源进行争抢时,没有抢到的线程怎么办?直接拒绝然后丢弃?最可行最合适的方式当然是让没有抢到资源的线程排队阻塞起来,这样即让出了CPU的资源,又方便当资源被释放、可争抢的时候唤醒线程来继续争抢。

我们发现一个信号量+一个线程安全的用来存放线程的队列,是并发编程体系的一个底座,这个底座有一个名字叫——同步器。

AQS(AbstractQueuedSynchronizer),一个JDK中提供的同步器,提供一系列线程的控制和同步机制,是JDK中诸多需要进行线程控制和同步的地方,诸如可重入锁ReentrantLock,以及诸多同步工具Semaphore、CountDownLatch、CyclicBarrier等,其底层都是使用的AQS来实现线程控制和同步。

1.2.AQS和BlockQueue的区别

阻塞队列(Blocking Queue)是一种特殊的队列,它的特殊之处在于它具有阻塞特性。当队列为空时,从队列中获取元素的线程会被阻塞,直到队列中有新的元素被加入;同样地,当队列满时,试图向队列中添加元素的线程也会被阻塞,直到队列中有空余的空间。这种特性使得阻塞队列非常适用于生产者-消费者模式中。

阻塞队列:

  • 元素的读写是线程安全的

  • 里面存的任务

  • 阻塞的是外部的生产者线程/消费者线程

AQS,同步器,是用来进行线程控制和同步的一个基础组件,用来让现场排队、阻塞、唤醒等操作。

AQS:

  • 元素的读写是安全的

  • 里面存的是线程

  • 阻塞的是内部存的线程

1.3.AQS的结构

AQS提供了什么?

一套API和一套数据结构。

可以看到AQS是一个抽象类:

既然是抽象类而不是直接是个接口就说明,其提供的不只是一个标准,而是一套资源。其底层用链表来组织其管理的线程,包含:

  • 链表的节点Node

  • 链表的头尾指针

  • 状态state,这是个volatile变量用于表示同步状态,在不同的实现类中,它的含义不同,有可能表示锁是否被持有、持有者持有锁的次数、信号量可用的许可数量等。

  • 条件变量ConditionObject

AQS提供了一套标准的API,来完成两方便的操作:

  • 状态控制

  • 资源争抢

状态控制:

  • getState():返回同步状态

  • setState(int newState):设置同步状态

  • compareAndSetState(int expect, int update):使用CAS设置同步状态

  • isHeldExclusively():当前线程是否持有资源 独占资源(不响应线程中断)

资源争抢和释放一共两种,共享模式、独占模式:

  • tryAcquire(int arg):独占式获取资源,子类实现

  • acquire(int arg):独占式获取资源模

  • tryRelease(int arg):独占式释放资源,子类实现

  • release(int arg):独占式释放资源模板 共享资源(不响应线程中断)

  • tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现

  • acquireShared(int arg):共享式获取资源模板

  • tryReleaseShared(int arg):共享式释放资源,子类实现

  • releaseShared(int arg):共享式释放资源模板

独占模式出现在对一把锁进行争抢、释放的场景中,如ReentrantLock中;共享模式出现在对多个资源的争抢、释放的场景中,如ReadWriteLock、CountDownLatch中。

2.源码分析

2.1.CLH队列

在AQS的源码开头,就描述了其是使用CLH队列来组织没争抢到资源的线程的:

该队列是用链表实现的,没被争抢到资源的线程会被封装成Node,被挂入链表中。node的入队和出队都用的CAS(自旋锁)来保证效率和安全的兼得。

这个Node类源码很简单,主要是要注意Node是有状态的,用状态来控制该节点该不该被唤醒,用waitStatus来表示状态:

Node 类中的静态常量定义了以下状态:

  • SIGNAL (-1):表示当前节点的后续节点需要被唤醒。这是默认状态,当一个节点被添加到队列中时,它的 waitStatus 会被设置为 -1。

  • PROPAGATE (-3):类似于 SIGNAL,但在某些情况下,如释放共享模式下的锁时,可能会设置为 -3,以帮助传播唤醒信

  • CONDITION (-2):表示节点位于条件队列中,而不是同步队列中。这通常与 Condition 对象相关联。

  • 0:表示没有特别的状态。当节点被初始化时,它的 waitStatus 默认为 0。

  • CANCELLED (1):表示节点已经被取消。这通常发生在线程被中断或者尝试获取锁失败时。一旦节点被标记为 CANCELLED,它就会从队列中移除。

waitStatus 的使用:

  • 线程阻塞:当一个线程被插入到队列中时,它的 waitStatus 通常会被设置为 SIGNAL。这意味着当前节点的后续节点应该被唤醒,当当前节点释放时。

  • 线程唤醒:当一个线程释放锁或其他同步状态时,它会检查自己的 waitStatus,并根据情况唤醒后继节点。例如,在独占模式下,当线程释放锁时,它会检查其后继节点的 waitStatus,如果是 SIGNAL,则会唤醒该节点对应的线程。

  • 条件队列:当线程等待某个条件满足时,它会被移到条件队列中,并且其 waitStatus 会被设置为 CONDITION。当条件满足时,线程会被放回同步队列,并且 waitStatus 会被设置为 SIGNAL。

  • 取消节点:如果线程被中断或超时,或者由于其他原因无法继续等待,那么该节点会被标记为 CANCELLED,并且从队列中移除。

整个CLH队列可以理解为:

2.2.模板方法的实现

2.2.1.独占模式

1.获取资源

AQS的实现类很多,此处我们以ReentrantLock的FairSync(公平锁)为例。

lock()其实是去,调用AQS的acquire():

AQS的acquire()会去调用实现类的tryAcquire()来判断当前线程是否能抢到线程,如果抢不到就入队CLH,并且阻塞:

至此我们就能体会到了,AQS实现了核心的阻塞+入队以及唤醒+出队的一些列对CLH中线程的操作,但具体的争抢策略作为模板开放出去了,也就是上文我们说的一系列模板方法。

我们来看看FairSync的tryAcquire()实现:

protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//获取锁的状态if (c == 0) {//如果锁没有被持有//就去判断CLH队列中还有没有线程,没有的话就去CAS获取锁if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果当前线程是锁的持有者就去增加锁的持有次数else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
2.释放资源

释放资源应该做些什么?这里我们想一下就能想出来:

  • 首先释放锁

  • 然后唤醒后面的线程。

释放锁:

可以看到ReentrantLock的unlock调用的AQS的release()去释放锁,AQS的release里会先去tryRelease()尝试释放锁,tryRelease()是交给子类重写的:

所以调用的就是reentrantLock的释放策略:

ReentrantLock的尝试释放的过程很简单,就是线程安全的去减少锁的持有次数+清除锁的持有者的信息:

protected final boolean tryRelease(int releases) {int c = getState() - releases;//检查当前线程是否是锁的所有者,确保只有锁的持有者能释放锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;//清除锁的持有者setExclusiveOwnerThread(null);}//刷新状态setState(c);return free;}

唤醒后面的线程:

继续回到AQS的release,继续往下看,tryRelease()后确定可以释放锁后,unparkSuccessor()唤醒CLH队列中需要被唤醒的第一个线程。为什么说是第一个需要被唤醒的喃?前文已经说过了Node节点有状态,状态用来表示该节点是否需要被唤醒,有些状态是表示节点不需要被唤醒的。

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);//唤醒后面的需要被唤醒的线程return true;}return false;}

unparkSuccessor()很简单,修改当前节点状态,唤醒后面需要唤醒的状态:

private void unparkSuccessor(Node node) {//将当前的节点从待唤醒状态变为已经唤醒int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);
​//找到后续第一个需要被唤醒的节点,然后唤醒它Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

2.2.2.共享模式

之所以有共享模式,是因为有时候资源不一定是唯一的,比如除了锁之外,可能要求能有10条线程同时工作,又或者读写之间不互斥等需求。在这种允许多条线程同时工作的情景中就要用到共享模式,标志位state不再表示一个意思(是否被持有),而是用来表示多个意思,有可能是线程的数量,也有可能是高低位分别表示是否允许读或者写。

共享模式下,资源的获取是通过acquireShared和tryAcquireShared来实现的,释放是通过releaseShared和tryReleaseShared来实现的。具体代码实现的话其实和独占模式大差不差,只是在对标志位state的操作上略有不同,以及可能会有一些位操作。以下是一些常见的使用共享模式的场景:

  • 读写锁 (ReadWriteLock): 读写锁是一种常用的同步工具,它允许多个线程同时读取共享资源,但在写入时需要独占资源。 读锁通常使用共享模式实现,因为多个读线程可以同时访问资源。 写锁则使用独占模式,因为写入操作通常需要独占资源以避免数据不一致。

  • 信号量 (Semaphore): 信号量用于限制可以同时访问共享资源的线程数量。 它可以通过减少或增加一个计数器来管理资源的可用性,这个计数器通常就是 AQS 的 state 字段。 当线程尝试获取信号量时,它会减少 state 的值;当线程释放信号量时,它会增加 state 的值。 因为多个线程可以同时获取信号量,所以信号量通常使用共享模式。

  • 循环屏障 (CyclicBarrier): 循环屏障允许一组线程相互等待,直到所有线程都到达了一个共同的屏障点。 当最后一个线程到达屏障点时,所有线程都会被释放继续执行。 循环屏障通常使用共享模式来管理线程的等待和释放。

  • CountDownLatch: CountDownLatch 是一种同步辅助类,它允许一个或多个线程等待其他线程完成操作。 它维护了一个计数器,每当一个操作完成时,计数器就会减少。 当计数器达到零时,所有等待的线程都会被释放。 CountDownLatch 通常使用共享模式来管理计数器的减少。

  • Exchanger: Exchanger 允许两个线程交换对象。 两个线程各自提供一个对象并等待另一个线程到达交换点。 一旦两个线程都到达交换点,它们就可以交换对象并继续执行。 Exchanger 通常使用共享模式来管理线程的交换过程。 示例代码

由于这是一个系列文章,后面几篇文章我们就将聊到CountDownLatch、Semaphore、CyclicBarrier之类的同步工具类、ReadWriteLock之类的读写锁,到时候分析源码的时候,再来看看各自的具体实现。

http://www.lryc.cn/news/417125.html

相关文章:

  • springcloud loadbalancer nacos无损发布
  • React原理
  • React-Native优质开源项目
  • Ajax-02
  • 供应商较多的汽车制造业如何选择供应商协同平台?
  • 【开端】JAVA Mono<Void>向前端返回没有登陆或登录超时 暂无权限访问信息组装
  • Python(模块---pandas+matplotlib+pyecharts)
  • 解决使用Navicat连接数据库时,打开数据库表很慢的问题
  • nginx重启报错nginx: [error] invalid PID number
  • 人工智能深度学习系列—深度学习中的相似性追求:Triplet Loss 全解析
  • 26. Hibernate 如何自动生成 SQL 语句
  • 预言机(Oracle machine)
  • 55、PHP实现插入排序、二分查找
  • [Git][分支设计规范]详细讲解
  • c#中winfrom需要了解的
  • 操作系统03:调度算法和文件系统
  • 大量中国高清地图,必须收藏!!
  • 无线领夹麦克风哪个品牌好,2024年收音麦哪个品牌好一点
  • 如何解决.NET8 类库Debug时,Debug文件夹中不包含Packages中引入的文件
  • 域名安全详解
  • 使用gstreamer命令行解析RTSP流
  • 如何基于离线包中“事件热点”进行二次开发
  • 使用继电器实现门电路(1)常用门电路的简化实现
  • 程序员常用单词分类
  • c语言11天笔记
  • 【C++刷题】优选算法——贪心第三辑
  • 9.2 grafana 上导入模板看图并讲解告警
  • python实现自动回复消息
  • Mysql 脚本转换为drawio ER 脚本
  • 基于babylonjs的小游戏 跳一跳