当前位置: 首页 > news >正文

JCTools 无锁并发队列基础:ConcurrentCircularArrayQueue

ConcurrentCircularArrayQueue 

ConcurrentCircularArrayQueue 是一个抽象类,它为基于数组的并发循环队列提供了基础功能。从其命名可以看出几个关键特性:

  • ​Concurrent​​:常指无锁并发。
  • ​Circular Array​​:内部使用循环数组作为其数据结构。这意味着它是一个有界队列,容量是固定的。
  • ​Queue​​:遵循队列的先进先出(FIFO)原则。

这个类本身是抽象的,它定义了所有基于循环数组的并发队列的通用字段和方法,但将生产者和消费者的索引管理等具体并发策略留给子类实现(例如 SpscArrayQueueMpscArrayQueue 等)。

ConcurrentCircularArrayQueue.java 中,我们可以看到两个核心字段:

// ... existing code ...
abstract class 
ConcurrentCircularArrayQueue<E> extends 
ConcurrentCircularArrayQueueL0Pad<E>implements MessagePassingQueue<E>, IndexedQueue, QueueProgressIndicators, SupportsIterator
{protected final long mask;protected final E[] buffer;ConcurrentCircularArrayQueue(int capacity){// ... existing code ...
  • protected final E[] buffer;
    这是存储队列元素的数组。它被声明为 final,一旦初始化后,其引用不可更改。数组的长度总是2的幂,这是为了通过位运算(&)来高效地计算索引,从而实现循环数组。

  • protected final long mask;
    掩码,它的值是 capacity - 1。通过 index & mask 操作,可以将一个不断增长的索引值(如生产者或消费者索引)快速映射到 buffer 数组的有效范围内,这比取模运算 (%) 效率高得多。

构造函数 ConcurrentCircularArrayQueue(int capacity) 会将用户请求的 capacity 向上取整到最接近的2的幂次方,以满足循环数组的设计要求。

此外,ConcurrentCircularArrayQueueL0Pad 这个父类通过填充缓存行(Cache Line Padding)来防止伪共享(False Sharing),这是并发编程中一个重要的性能优化手段。

ConcurrentCircularArrayQueue 实现了一系列接口,这些接口定义了它的核心行为和能力。

MessagePassingQueue<E> 

接口定义了队列作为消息传递工具的核心 API。它扩展了标准的 java.util.Queue,并增加了一些针对高性能并发场景的方法,例如:

  • relaxedOffer(E e):一个宽松的入队操作,可能不会立即让其他线程看到新元素,但性能更高。
  • relaxedPoll():一个宽松的出队操作。
  • drain(Consumer<E> c, int limit):从队列中批量取出元素并由指定的消费者处理。

这个接口表明 ConcurrentCircularArrayQueue 不仅仅是一个普通的数据集合,更是一个为高性能线程间通信设计的工具。

IndexedQueue

这个接口提供了访问队列内部索引的能力,主要用于计算队列大小和容量。

  • lvProducerIndex():获取生产者索引的最新值(volatile read)。
  • lvConsumerIndex():获取消费者索引的最新值(volatile read)。
  • capacity():返回队列的容量。

通过这两个索引,IndexedQueueSizeUtil.size(this) 可以计算出当前队列中的元素数量。这种设计将索引的访问方式标准化,使得大小计算逻辑可以被重用。

QueueProgressIndicators

这个接口提供了对队列生产者和消费者进度的可见性,主要用于监控和调试。

  • currentProducerIndex():返回当前生产者索引。
  • currentConsumerIndex():返回当前消费者索引。

这与 IndexedQueue 中的方法类似,但语义上更侧重于“进度”而非“索引”,通常用于外部观察者了解队列的活动状态。

SupportsIterator

这个接口表明该队列支持迭代器。ConcurrentCircularArrayQueue 提供了一个内部实现的 WeakIterator

// ... existing code ...@Overridepublic Iterator<E> iterator() {final long cIndex = lvConsumerIndex();final long pIndex = lvProducerIndex();return new WeakIterator(cIndex, pIndex, mask, buffer);}private static class WeakIterator<E> implements Iterator<E> {// ... existing code ...

这个迭代器是“弱一致性”的:

  • 它提供了队列在创建迭代器那一刻的一个“快照”。
  • 它不保证返回的元素严格按照队列顺序,并且可能会遗漏在迭代期间入队或出队的元素。
  • 它不会抛出 ConcurrentModificationException,是线程安全的。

这种设计是为了在不加锁的情况下提供一个尽力而为的迭代功能,适用于调试或监控等不要求强一致性的场景。

内部类 WeakIterator

private static class WeakIterator<E> implements Iterator<E>

首先,从类的声明来看:

  • private: 这是一个私有内部类,意味着它只能被外部类 ConcurrentCircularArrayQueue 访问。这是良好的封装,表明它的实现细节与外部类紧密相关,不希望被外部直接使用。
  • static: 这是一个静态内部类。静态内部类不持有外部类实例的隐式引用。这意味着 WeakIterator 对象本身是一个独立的对象,不会意外地阻止 ConcurrentCircularArrayQueue 实例被垃圾回收。它需要的所有信息(如数组缓冲区、索引等)都必须通过构造函数显式传入,这使得它的状态更加清晰和独立。
  • implements Iterator<E>: 它实现了标准的 java.util.Iterator 接口,提供了 hasNext()next() 和 remove() 方法,可以用于标准的 for-each 循环中。

核心定位:“弱一致性” (Weakly Consistent) 迭代器

这个迭代器最核心的特性是“弱一致性”或“尽力而为”(best-effort)。在类的注释中也明确提到了这一点:

The iterator provides a best-effort snapshot of the elements in the queue. The returned iterator is not guaranteed to return elements in queue order, and races with the consumer thread may cause gaps in the sequence of returned elements.

译:迭代器提供了队列中元素的一个尽力而为的快照。返回的迭代器不保证按队列顺序返回元素,并且与消费者线程的竞争可能会导致返回的元素序列中出现间隙。

这意味着:

  1. 快照性 (Snapshot): 迭代器在创建时会“拍下”当时队列的消费者索引 (cIndex) 和生产者索引 (pIndex)。它只会尝试遍历这个索引范围内的元素,不会看到在它创建之后入队的任何新元素。
  2. 非阻塞/无锁 (Non-Blocking/Lock-Free): 迭代器在遍历时不会对队列加锁。生产者和消费者线程可以完全并发地对队列进行操作,这保证了高吞吐量。
  3. 可能存在间隙 (Gaps): 由于无锁,当迭代器正在遍历时,消费者线程可能已经取走(消费)了某个元素。当迭代器访问到那个位置时,会发现是 null,于是它会跳过这个元素,继续向后寻找。这就造成了迭代结果中可能出现“间隙”。

成员变量

// ... existing code ...private static class WeakIterator<E> implements Iterator<E> {private final long pIndex;private final long mask;private final E[] buffer;private long nextIndex;private E nextElement;
// ... existing code ...
  • private final long pIndex;: 迭代器创建时刻的生产者索引。final 关键字确保它在迭代器生命周期内不会改变,定义了遍历的上限
  • private final long mask;: 用于将长整型的索引(会无限增长)映射到数组的实际下标,计算方式通常是 offset = index & mask
  • private final E[] buffer;: 对队列底层数组的引用。
  • private long nextIndex;: 迭代器当前检查的索引位置。它从 cIndex 开始,逐步增加到 pIndex
  • private E nextElement;预取/前瞻的下一个元素。这是一种常见的迭代器实现模式,hasNext() 只需检查这个字段是否为 null,而 next() 返回这个字段并再次预取下一个,简化了逻辑。

构造函数

// ... existing code ...WeakIterator(long cIndex, long pIndex, long mask, E[] buffer) {this.nextIndex = cIndex;this.pIndex = pIndex;this.mask = mask;this.buffer = buffer;nextElement = getNext();}
// ... existing code ...
  • 构造函数接收创建迭代器那一刻的消费者索引 (cIndex)、生产者索引 (pIndex)、掩码和缓冲区。
  • 它将 cIndex 初始化为 nextIndex,将 pIndex 存入 final 字段。
  • 最关键的一步是 nextElement = getNext();。在构造函数里,它就立即调用 getNext() 尝试获取第一个有效的元素。这使得第一次调用 hasNext() 时就能立刻知道结果,而无需做任何计算。

hasNext() 和 next()

// ... existing code ...@Overridepublic boolean hasNext() {return nextElement != null;}@Overridepublic E next() {final E e = nextElement;if (e == null)throw new NoSuchElementException();nextElement = getNext();return e;}
// ... existing code ...
  • hasNext(): 逻辑非常简单,直接判断预取的 nextElement 是否为空。
  • next():
    1. 保存当前的 nextElement
    2. 如果它为 null(意味着 hasNext() 返回了 false),则抛出 NoSuchElementException,符合 Iterator 接口规范。
    3. 调用 getNext() 去预取下一个元素,为下一次调用 hasNext() 或 next() 做准备。
    4. 返回之前保存的元素。

getNext() - 迭代器的引擎

// ... existing code ...private E getNext() {while (nextIndex < pIndex) {long offset = calcCircularRefElementOffset(nextIndex++, mask);E e = lvRefElement(buffer, offset);if (e != null) {return e;}}return null;}
// ... existing code ...

这是迭代器最核心的逻辑所在。

  1. while (nextIndex < pIndex): 循环的条件保证了迭代器只在创建时确定的索引范围内查找。
  2. nextIndex++: 在计算偏移量后,nextIndex 会自增,为下一次循环做准备。
  3. E e = lvRefElement(buffer, offset);: 这是关键的并发操作。lvRefElement 是 Unsafe 类操作的封装,代表 Load Volatile,即以 volatile 内存语义读取数组中的元素。这确保了生产者线程对元素的写入对迭代器线程是可见的,避免了读到旧的、未初始化的值。
  4. if (e != null): 如果读取到的元素不为 null,说明找到了一个有效元素,立即返回它。
  5. 如果 e 是 null,循环会继续。这正是处理“间隙”的地方:迭代器看到了一个空槽,它会认为这个元素已经被消费者线程取走,于是跳过它,继续寻找下一个。
  6. return null;: 如果 while 循环结束(即 nextIndex 到达了 pIndex),仍然没有找到非空元素,就返回 null,表示迭代结束。

remove()

// ... existing code ...@Overridepublic void remove() {throw new UnsupportedOperationException("remove");}
// ... existing code ...
  • 直接抛出 UnsupportedOperationException。在无锁的并发数据结构上安全地实现 remove() 操作非常复杂,且容易引入竞争条件。因此,JCTools 和很多其他并发库(如 ConcurrentLinkedQueue 的迭代器)一样,选择不支持此操作。在 RELEASE-NOTES.md 中也提到了早期版本不支持 iterator() 方法,后来虽然支持了,但 remove() 依然是被禁止的,这是一种安全且常见的设计决策。

总结

WeakIterator 是一个精心设计的、用于并发环境的轻量级迭代器。它的核心特点可以概括为:

  • 线程安全与高性能: 通过无锁设计和 volatile 读,实现了不阻塞生产者/消费者的线程安全遍历。
  • 弱一致性快照: 它提供的是一个“尽力而为”的快照,不保证反映队列的精确状态或元素顺序,但足以用于调试、监控或某些特定场景下的批量读取。
  • 实现简洁优雅: 采用“预取”(lookahead)模式,使得 hasNext() 和 next() 的逻辑非常清晰简单,将复杂性集中在 getNext() 方法中。
  • 内存友好: 作为静态内部类,它不持有外部类的引用,避免了潜在的内存泄漏问题。

总而言之,WeakIterator 是在追求极致性能的并发队列中,为“遍历”这一功能所做出的一个典型权衡:牺牲了强一致性,换取了高并发性和无锁的性能优势。


核心方法实现

  • size()isEmpty()
    这两个方法都委托给 IndexedQueueSizeUtil 工具类,通过比较生产者和消费者索引来计算,避免了在并发环境下维护一个单独的 size 变量所带来的开销和竞争。

  • clear()
    通过不断调用 poll() 方法直到返回 null 来清空队列。这是一个简单但有效的方法。


总结

ConcurrentCircularArrayQueue 是 JCTools 中各种有界并发数组队列的基石。它通过以下设计实现了高性能和线程安全:

  1. ​循环数组与2的幂容量​​:使用位运算替代取模运算,提升索引计算效率。
  2. ​缓存行填充​​:避免伪共享,减少多核 CPU 环境下的缓存争用。
  3. ​分离索引管理​​:将生产者和消费者的索引管理逻辑分离到子类中,以支持不同的并发模型(SPSC、MPSC、SPMC、MPMC)。
  4. ​弱一致性迭代器​​:提供一个轻量级、无锁的迭代器,适用于监控和调试。
  5. ​标准化的接口​​:通过实现 MessagePassingQueueIndexedQueue 等接口,提供了丰富且明确的 API,使其不仅仅是一个集合,更是一个强大的消息传递工具。

ConcurrentSequencedCircularArrayQueue

这是一个在 JCTools 中非常核心的抽象基类,理解它对于理解 MPMC(多生产者多消费者)等高级队列的实现至关重要。

public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E>
{
//...
}
  • extends ConcurrentCircularArrayQueue<E>: 它继承自我们之前分析过的 ConcurrentCircularArrayQueue<E>。这意味着它天然就拥有了父类的所有特性,包括:
    • 一个用于存储元素的环形数组 protected final E[] buffer;
    • 一个用于计算数组索引的掩码 protected final long mask;
    • 基本的队列属性和方法,如容量 capacity()、清空 clear() 等。

sequenceBuffer

这个类最核心的扩展是增加了一个新的成员变量:

// ... existing code ...
public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E>
{protected final long[] sequenceBuffer;public ConcurrentSequencedCircularArrayQueue(int capacity)
// ... existing code ...
  • protected final long[] sequenceBuffer;: 这是一个 long 类型的数组,名为“序列缓冲区”。它的长度与存储元素的 buffer 数组相同。这个数组是实现高级并发控制算法的关键。

sequenceBuffer 为队列中的每一个槽(slot)都提供了一个对应的“序列号”或“票据”。生产者和消费者通过检查和更新这个序列号来协调对共享数据(即 buffer 数组中的元素)的访问,从而避免使用锁。

构造函数与初始化

构造函数的实现揭示了 sequenceBuffer 的工作原理。

// ... existing code ...public ConcurrentSequencedCircularArrayQueue(int capacity){super(capacity);int actualCapacity = (int) (this.mask + 1);// pad data on either end with some empty slots. Note that actualCapacity is <= MAX_POW2_INTsequenceBuffer = allocateLongArray(actualCapacity);for (long i = 0; i < actualCapacity; i++){soLongElement(sequenceBuffer, calcCircularLongElementOffset(i, mask), i);}}
}
  1. super(capacity);: 调用父类构造函数,初始化 buffer 数组和 mask
  2. sequenceBuffer = allocateLongArray(actualCapacity);: 分配 sequenceBuffer 数组。allocateLongArray 只是简单分配一个数组。
  3. for 循环初始化: 这是最关键的部分。循环遍历 sequenceBuffer 的每一个槽位,并执行 soLongElement(..., i)
    • calcCircularLongElementOffset(i, mask): 计算第 i 个槽在内存中的偏移量。
    • soLongElement(..., i)so 是 Store Ordered 的缩写,对应于 Unsafe.putOrderedLong。这是一个有内存屏障效果的写操作,它比普通的写要强,但比 volatile 写要弱。它保证了在此操作之前的内存写入不会被重排序到它之后。
    • 初始化逻辑sequenceBuffer 的第 i 个槽被初始化为值 i。即 sequenceBuffer[i] = i

 “序列号”并发控制机制

通过这个初始化,ConcurrentSequencedCircularArrayQueue 为子类(如 MpmcArrayQueue)奠定了并发算法的基础。这个算法大致如下:

  • 初始状态: 队列为空。buffer 数组里都是 nullsequenceBuffer 的第 i 个槽位的值是 i
  • 生产者入队 (offer):
    1. 一个生产者想要在逻辑索引 p 处放入一个元素。
    2. 它首先计算出该索引对应的数组位置 offset = p & mask
    3. 它会去检查 sequenceBuffer[offset] 的值。
    4. 如果 sequenceBuffer[offset] 的值正好等于 p,这就像一张“票据”,表明这个槽位是空闲的,并且轮到索引为 p 的这次操作来使用它。
    5. 生产者获得许可,将元素写入 buffer[offset]
    6. 写入元素后,生产者会更新 sequenceBuffer[offset] 的值为 p + 1。这个新值 p + 1 成为了给消费者的信号,表示“索引为 p 的槽位已经准备好了,你可以来消费了”。
  • 消费者出队 (poll):
    1. 一个消费者想要消费逻辑索引 c 处的元素。
    2. 它计算出数组位置 offset = c & mask
    3. 它去检查 sequenceBuffer[offset] 的值。
    4. 如果 sequenceBuffer[offset] 的值等于 c + 1,这就表明生产者已经完成了在 c 位置的写入,数据可供消费。
    5. 消费者获得许可,从 buffer[offset] 读取元素。
    6. 读取之后,消费者会更新 sequenceBuffer[offset] 的值为 c + capacity (即 c + mask + 1)。这个值将会是下一轮生产者在同一个物理槽位 offset 上期望看到的序列号,从而完成一个循环。

总结

ConcurrentSequencedCircularArrayQueue 是一个巧妙的抽象,它在 ConcurrentCircularArrayQueue 的基础上,通过增加一个并行的 sequenceBuffer,为无锁(Lock-Free)并发队列算法提供了核心机制。

  • 关注点分离: 它将数据存储 (buffer) 和并发控制 (sequenceBuffer) 分离开来。
  • 奠定基础: 它不实现具体的 offer 和 poll 逻辑,而是将 sequenceBuffer 初始化好,交由 MpmcArrayQueueMpscSequencedArrayQueue 等子类去实现具体的、基于序列号检查的入队和出队操作。
  • 高性能设计: 使用 Unsafe 和 putOrderedLong 等底层技术,旨在最大化吞吐量和最小化延迟。

因此,这个类是 JCTools 中实现高性能多线程队列(尤其是多生产者场景)的关键基石。

sequenceBuffer:每个槽位的“状态指示器”

可以把 sequenceBuffer 想象成一个与主数据数组 buffer 并行存在的、专门用来标记状态的记分板。buffer 里的每个槽位(slot),都在 sequenceBuffer 的相同位置有一个对应的 long 类型数字,我们称之为 seq

这个 seq 的值不是随便写的,它遵循一个严格的协议,用来表示其对应槽位的状态。对于一个位于 i 位置的槽位,它的 seq 值有以下几种含义:

  • seq == i: 槽位 i 是空的,可以被生产者用来存放一个生产者序号(producerIndex)为 i 的元素。
  • seq == i + 1: 槽位 i 已被写入数据,可以被消费者消费。生产者在放入元素后,会将 seq 更新为 i + 1,作为“已发布”的信号。
  • seq == i + capacity: 槽位 i 已被消费,并且消费者已经将其标记为“可回收”。消费者取出元素后,会将 seq 更新为 i + capacity。这个 + capacity 的操作,相当于把这个槽位的“版本号”推进了一轮,为下一圈的生产者做好了准备。

举个例子: 假设队列容量 capacity 是 8。

  • 生产者想在 producerIndex = 0 的位置放东西。它会检查 sequenceBuffer[0] 的值是不是 0
  • 如果是,它就把元素放入 buffer[0],然后把 sequenceBuffer[0] 的值更新为 1
  • 消费者想从 consumerIndex = 0 的位置取东西。它会检查 sequenceBuffer[0] 的值是不是 0 + 1 = 1
  • 如果是,它就从 buffer[0] 取出元素,然后把 sequenceBuffer[0] 的值更新为 0 + 8 = 8
  • 现在,当生产者跑完一整圈,轮到 producerIndex = 8 时,它会检查 sequenceBuffer[0] (因为 8 % 8 == 0) 的值是不是 8。正好是消费者更新后的值!于是生产者可以安全地重用这个槽位。

这个 seq 就是生产者和消费者之间不见面就能沟通的“信物”。

http://www.lryc.cn/news/623312.html

相关文章:

  • 深入解析C++ STL链表(List)模拟实现
  • 如何得知是Counter.razor通过HTTP回调处理的还是WASM处理的,怎么检测?
  • 基于Python的电影评论数据分析系统 Python+Django+Vue.js
  • qt vs2019编译QXlsx
  • Qt QDateTime时间部分显示为全0,QTime赋值后显示无效问题【已解决】
  • ML307C 4G通信板:工业级DTU固件,多协议支持,智能配置管理
  • 随机整数列表处理:偶数索引降序排序
  • 数据库索引视角:对比二叉树到红黑树再到B树
  • 《探索IndexedDB实现浏览器端UTXO模型的前沿技术》
  • 使用影刀RPA实现快递信息抓取
  • C++ 最短路Dijkstra
  • 9.从零开始写LINUX内核——设置中断描述符表
  • Python 类元编程(元类的特殊方法 __prepare__)
  • Flink Stream API 源码走读 - 总结
  • 楼宇自控系统赋能建筑全维度管理,实现环境、安全与能耗全面监管
  • STM32硬件SPI配置为全双工模式下不要单独使用HAL_SPI_Transmit API及HAL_SPI_TransmitReceive改造方法
  • 【时时三省】(C语言基础)共用体类型数据的特点
  • Langfuse2.60.3:独立数据库+docker部署及环境变量详细说明
  • Java 中重载与重写的全面解析(更新版)
  • Mybatis-3自己实现MyBatis底层机制
  • 从冒泡到快速排序:探索经典排序算法的奥秘(二)
  • PHP反序列化的CTF题目环境和做题复现第1集
  • 企业运维规划及Linux介绍虚拟环境搭建
  • python---包
  • 一文速通Python并行计算:14 Python异步编程-协程的管理和调度
  • CF每日3题(1500-1700)
  • P2169 正则表达式
  • w嵌入式分享合集66
  • 【Bluedroid】A2DP控制通道UIPC机制深度解析(btif_a2dp_control_init)
  • Java8~Java21重要新特性