JCTools 无锁并发队列基础:ConcurrentCircularArrayQueue
ConcurrentCircularArrayQueue
ConcurrentCircularArrayQueue
是一个抽象类,它为基于数组的并发循环队列提供了基础功能。从其命名可以看出几个关键特性:
- Concurrent:常指无锁并发。
- Circular Array:内部使用循环数组作为其数据结构。这意味着它是一个有界队列,容量是固定的。
- Queue:遵循队列的先进先出(FIFO)原则。
这个类本身是抽象的,它定义了所有基于循环数组的并发队列的通用字段和方法,但将生产者和消费者的索引管理等具体并发策略留给子类实现(例如 SpscArrayQueue
、MpscArrayQueue
等)。
在 ConcurrentCircularArrayQueue.java
中,我们可以看到两个核心字段:
// ... existing code ...
abstract class
ConcurrentCircularArrayQueue<E> extends
ConcurrentCircularArrayQueueL0Pad<E>implements MessagePassingQueue<E>, IndexedQueue, QueueProgressIndicators, SupportsIterator
{protected final long mask;protected final E[] buffer;ConcurrentCircularArrayQueue(int capacity){// ... existing code ...
protected final E[] buffer;
这是存储队列元素的数组。它被声明为final
,一旦初始化后,其引用不可更改。数组的长度总是2的幂,这是为了通过位运算(&
)来高效地计算索引,从而实现循环数组。
protected final long mask;
掩码,它的值是capacity - 1
。通过index & mask
操作,可以将一个不断增长的索引值(如生产者或消费者索引)快速映射到buffer
数组的有效范围内,这比取模运算 (%
) 效率高得多。
构造函数 ConcurrentCircularArrayQueue(int capacity)
会将用户请求的 capacity
向上取整到最接近的2的幂次方,以满足循环数组的设计要求。
此外,ConcurrentCircularArrayQueueL0Pad
这个父类通过填充缓存行(Cache Line Padding)来防止伪共享(False Sharing),这是并发编程中一个重要的性能优化手段。
ConcurrentCircularArrayQueue
实现了一系列接口,这些接口定义了它的核心行为和能力。
MessagePassingQueue<E>
接口定义了队列作为消息传递工具的核心 API。它扩展了标准的 java.util.Queue
,并增加了一些针对高性能并发场景的方法,例如:
relaxedOffer(E e)
:一个宽松的入队操作,可能不会立即让其他线程看到新元素,但性能更高。relaxedPoll()
:一个宽松的出队操作。drain(Consumer<E> c, int limit)
:从队列中批量取出元素并由指定的消费者处理。
这个接口表明 ConcurrentCircularArrayQueue
不仅仅是一个普通的数据集合,更是一个为高性能线程间通信设计的工具。
IndexedQueue
这个接口提供了访问队列内部索引的能力,主要用于计算队列大小和容量。
lvProducerIndex()
:获取生产者索引的最新值(volatile read)。lvConsumerIndex()
:获取消费者索引的最新值(volatile read)。capacity()
:返回队列的容量。
通过这两个索引,IndexedQueueSizeUtil.size(this)
可以计算出当前队列中的元素数量。这种设计将索引的访问方式标准化,使得大小计算逻辑可以被重用。
QueueProgressIndicators
这个接口提供了对队列生产者和消费者进度的可见性,主要用于监控和调试。
currentProducerIndex()
:返回当前生产者索引。currentConsumerIndex()
:返回当前消费者索引。
这与 IndexedQueue
中的方法类似,但语义上更侧重于“进度”而非“索引”,通常用于外部观察者了解队列的活动状态。
SupportsIterator
这个接口表明该队列支持迭代器。ConcurrentCircularArrayQueue
提供了一个内部实现的 WeakIterator
。
// ... existing code ...@Overridepublic Iterator<E> iterator() {final long cIndex = lvConsumerIndex();final long pIndex = lvProducerIndex();return new WeakIterator(cIndex, pIndex, mask, buffer);}private static class WeakIterator<E> implements Iterator<E> {// ... existing code ...
这个迭代器是“弱一致性”的:
- 它提供了队列在创建迭代器那一刻的一个“快照”。
- 它不保证返回的元素严格按照队列顺序,并且可能会遗漏在迭代期间入队或出队的元素。
- 它不会抛出
ConcurrentModificationException
,是线程安全的。
这种设计是为了在不加锁的情况下提供一个尽力而为的迭代功能,适用于调试或监控等不要求强一致性的场景。
内部类 WeakIterator
private static class WeakIterator<E> implements Iterator<E>
首先,从类的声明来看:
private
: 这是一个私有内部类,意味着它只能被外部类ConcurrentCircularArrayQueue
访问。这是良好的封装,表明它的实现细节与外部类紧密相关,不希望被外部直接使用。static
: 这是一个静态内部类。静态内部类不持有外部类实例的隐式引用。这意味着WeakIterator
对象本身是一个独立的对象,不会意外地阻止ConcurrentCircularArrayQueue
实例被垃圾回收。它需要的所有信息(如数组缓冲区、索引等)都必须通过构造函数显式传入,这使得它的状态更加清晰和独立。implements Iterator<E>
: 它实现了标准的java.util.Iterator
接口,提供了hasNext()
、next()
和remove()
方法,可以用于标准的for-each
循环中。
核心定位:“弱一致性” (Weakly Consistent) 迭代器
这个迭代器最核心的特性是“弱一致性”或“尽力而为”(best-effort)。在类的注释中也明确提到了这一点:
The iterator provides a best-effort snapshot of the elements in the queue. The returned iterator is not guaranteed to return elements in queue order, and races with the consumer thread may cause gaps in the sequence of returned elements.
译:迭代器提供了队列中元素的一个尽力而为的快照。返回的迭代器不保证按队列顺序返回元素,并且与消费者线程的竞争可能会导致返回的元素序列中出现间隙。
这意味着:
- 快照性 (Snapshot): 迭代器在创建时会“拍下”当时队列的消费者索引 (
cIndex
) 和生产者索引 (pIndex
)。它只会尝试遍历这个索引范围内的元素,不会看到在它创建之后入队的任何新元素。 - 非阻塞/无锁 (Non-Blocking/Lock-Free): 迭代器在遍历时不会对队列加锁。生产者和消费者线程可以完全并发地对队列进行操作,这保证了高吞吐量。
- 可能存在间隙 (Gaps): 由于无锁,当迭代器正在遍历时,消费者线程可能已经取走(消费)了某个元素。当迭代器访问到那个位置时,会发现是
null
,于是它会跳过这个元素,继续向后寻找。这就造成了迭代结果中可能出现“间隙”。
成员变量
// ... existing code ...private static class WeakIterator<E> implements Iterator<E> {private final long pIndex;private final long mask;private final E[] buffer;private long nextIndex;private E nextElement;
// ... existing code ...
private final long pIndex;
: 迭代器创建时刻的生产者索引。final
关键字确保它在迭代器生命周期内不会改变,定义了遍历的上限。private final long mask;
: 用于将长整型的索引(会无限增长)映射到数组的实际下标,计算方式通常是offset = index & mask
。private final E[] buffer;
: 对队列底层数组的引用。private long nextIndex;
: 迭代器当前检查的索引位置。它从cIndex
开始,逐步增加到pIndex
。private E nextElement;
: 预取/前瞻的下一个元素。这是一种常见的迭代器实现模式,hasNext()
只需检查这个字段是否为null
,而next()
返回这个字段并再次预取下一个,简化了逻辑。
构造函数
// ... existing code ...WeakIterator(long cIndex, long pIndex, long mask, E[] buffer) {this.nextIndex = cIndex;this.pIndex = pIndex;this.mask = mask;this.buffer = buffer;nextElement = getNext();}
// ... existing code ...
- 构造函数接收创建迭代器那一刻的消费者索引 (
cIndex
)、生产者索引 (pIndex
)、掩码和缓冲区。 - 它将
cIndex
初始化为nextIndex
,将pIndex
存入final
字段。 - 最关键的一步是
nextElement = getNext();
。在构造函数里,它就立即调用getNext()
尝试获取第一个有效的元素。这使得第一次调用hasNext()
时就能立刻知道结果,而无需做任何计算。
hasNext()
和 next()
// ... existing code ...@Overridepublic boolean hasNext() {return nextElement != null;}@Overridepublic E next() {final E e = nextElement;if (e == null)throw new NoSuchElementException();nextElement = getNext();return e;}
// ... existing code ...
hasNext()
: 逻辑非常简单,直接判断预取的nextElement
是否为空。next()
:- 保存当前的
nextElement
。 - 如果它为
null
(意味着hasNext()
返回了false
),则抛出NoSuchElementException
,符合Iterator
接口规范。 - 调用
getNext()
去预取下一个元素,为下一次调用hasNext()
或next()
做准备。 - 返回之前保存的元素。
- 保存当前的
getNext()
- 迭代器的引擎
// ... existing code ...private E getNext() {while (nextIndex < pIndex) {long offset = calcCircularRefElementOffset(nextIndex++, mask);E e = lvRefElement(buffer, offset);if (e != null) {return e;}}return null;}
// ... existing code ...
这是迭代器最核心的逻辑所在。
while (nextIndex < pIndex)
: 循环的条件保证了迭代器只在创建时确定的索引范围内查找。nextIndex++
: 在计算偏移量后,nextIndex
会自增,为下一次循环做准备。E e = lvRefElement(buffer, offset);
: 这是关键的并发操作。lvRefElement
是Unsafe
类操作的封装,代表 Load Volatile,即以 volatile 内存语义读取数组中的元素。这确保了生产者线程对元素的写入对迭代器线程是可见的,避免了读到旧的、未初始化的值。if (e != null)
: 如果读取到的元素不为null
,说明找到了一个有效元素,立即返回它。- 如果
e
是null
,循环会继续。这正是处理“间隙”的地方:迭代器看到了一个空槽,它会认为这个元素已经被消费者线程取走,于是跳过它,继续寻找下一个。 return null;
: 如果while
循环结束(即nextIndex
到达了pIndex
),仍然没有找到非空元素,就返回null
,表示迭代结束。
remove()
// ... existing code ...@Overridepublic void remove() {throw new UnsupportedOperationException("remove");}
// ... existing code ...
- 直接抛出
UnsupportedOperationException
。在无锁的并发数据结构上安全地实现remove()
操作非常复杂,且容易引入竞争条件。因此,JCTools 和很多其他并发库(如ConcurrentLinkedQueue
的迭代器)一样,选择不支持此操作。在RELEASE-NOTES.md
中也提到了早期版本不支持iterator()
方法,后来虽然支持了,但remove()
依然是被禁止的,这是一种安全且常见的设计决策。
总结
WeakIterator
是一个精心设计的、用于并发环境的轻量级迭代器。它的核心特点可以概括为:
- 线程安全与高性能: 通过无锁设计和 volatile 读,实现了不阻塞生产者/消费者的线程安全遍历。
- 弱一致性快照: 它提供的是一个“尽力而为”的快照,不保证反映队列的精确状态或元素顺序,但足以用于调试、监控或某些特定场景下的批量读取。
- 实现简洁优雅: 采用“预取”(
lookahead
)模式,使得hasNext()
和next()
的逻辑非常清晰简单,将复杂性集中在getNext()
方法中。 - 内存友好: 作为静态内部类,它不持有外部类的引用,避免了潜在的内存泄漏问题。
总而言之,WeakIterator
是在追求极致性能的并发队列中,为“遍历”这一功能所做出的一个典型权衡:牺牲了强一致性,换取了高并发性和无锁的性能优势。
核心方法实现
size()
和isEmpty()
这两个方法都委托给IndexedQueueSizeUtil
工具类,通过比较生产者和消费者索引来计算,避免了在并发环境下维护一个单独的size
变量所带来的开销和竞争。
clear()
通过不断调用poll()
方法直到返回null
来清空队列。这是一个简单但有效的方法。
总结
ConcurrentCircularArrayQueue
是 JCTools 中各种有界并发数组队列的基石。它通过以下设计实现了高性能和线程安全:
- 循环数组与2的幂容量:使用位运算替代取模运算,提升索引计算效率。
- 缓存行填充:避免伪共享,减少多核 CPU 环境下的缓存争用。
- 分离索引管理:将生产者和消费者的索引管理逻辑分离到子类中,以支持不同的并发模型(SPSC、MPSC、SPMC、MPMC)。
- 弱一致性迭代器:提供一个轻量级、无锁的迭代器,适用于监控和调试。
- 标准化的接口:通过实现
MessagePassingQueue
、IndexedQueue
等接口,提供了丰富且明确的 API,使其不仅仅是一个集合,更是一个强大的消息传递工具。
ConcurrentSequencedCircularArrayQueue
这是一个在 JCTools 中非常核心的抽象基类,理解它对于理解 MPMC(多生产者多消费者)等高级队列的实现至关重要。
public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E>
{
//...
}
extends ConcurrentCircularArrayQueue<E>
: 它继承自我们之前分析过的ConcurrentCircularArrayQueue<E>
。这意味着它天然就拥有了父类的所有特性,包括:- 一个用于存储元素的环形数组
protected final E[] buffer;
。 - 一个用于计算数组索引的掩码
protected final long mask;
。 - 基本的队列属性和方法,如容量
capacity()
、清空clear()
等。
- 一个用于存储元素的环形数组
sequenceBuffer
这个类最核心的扩展是增加了一个新的成员变量:
// ... existing code ...
public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E>
{protected final long[] sequenceBuffer;public ConcurrentSequencedCircularArrayQueue(int capacity)
// ... existing code ...
protected final long[] sequenceBuffer;
: 这是一个long
类型的数组,名为“序列缓冲区”。它的长度与存储元素的buffer
数组相同。这个数组是实现高级并发控制算法的关键。
sequenceBuffer
为队列中的每一个槽(slot)都提供了一个对应的“序列号”或“票据”。生产者和消费者通过检查和更新这个序列号来协调对共享数据(即 buffer
数组中的元素)的访问,从而避免使用锁。
构造函数与初始化
构造函数的实现揭示了 sequenceBuffer
的工作原理。
// ... existing code ...public ConcurrentSequencedCircularArrayQueue(int capacity){super(capacity);int actualCapacity = (int) (this.mask + 1);// pad data on either end with some empty slots. Note that actualCapacity is <= MAX_POW2_INTsequenceBuffer = allocateLongArray(actualCapacity);for (long i = 0; i < actualCapacity; i++){soLongElement(sequenceBuffer, calcCircularLongElementOffset(i, mask), i);}}
}
super(capacity);
: 调用父类构造函数,初始化buffer
数组和mask
。sequenceBuffer = allocateLongArray(actualCapacity);
: 分配sequenceBuffer
数组。allocateLongArray
只是简单分配一个数组。for
循环初始化: 这是最关键的部分。循环遍历sequenceBuffer
的每一个槽位,并执行soLongElement(..., i)
。calcCircularLongElementOffset(i, mask)
: 计算第i
个槽在内存中的偏移量。soLongElement(..., i)
:so
是Store Ordered
的缩写,对应于Unsafe.putOrderedLong
。这是一个有内存屏障效果的写操作,它比普通的写要强,但比volatile
写要弱。它保证了在此操作之前的内存写入不会被重排序到它之后。- 初始化逻辑:
sequenceBuffer
的第i
个槽被初始化为值i
。即sequenceBuffer[i] = i
。
“序列号”并发控制机制
通过这个初始化,ConcurrentSequencedCircularArrayQueue
为子类(如 MpmcArrayQueue
)奠定了并发算法的基础。这个算法大致如下:
- 初始状态: 队列为空。
buffer
数组里都是null
。sequenceBuffer
的第i
个槽位的值是i
。 - 生产者入队 (offer):
- 一个生产者想要在逻辑索引
p
处放入一个元素。 - 它首先计算出该索引对应的数组位置
offset = p & mask
。 - 它会去检查
sequenceBuffer[offset]
的值。 - 如果
sequenceBuffer[offset]
的值正好等于p
,这就像一张“票据”,表明这个槽位是空闲的,并且轮到索引为p
的这次操作来使用它。 - 生产者获得许可,将元素写入
buffer[offset]
。 - 写入元素后,生产者会更新
sequenceBuffer[offset]
的值为p + 1
。这个新值p + 1
成为了给消费者的信号,表示“索引为p
的槽位已经准备好了,你可以来消费了”。
- 一个生产者想要在逻辑索引
- 消费者出队 (poll):
- 一个消费者想要消费逻辑索引
c
处的元素。 - 它计算出数组位置
offset = c & mask
。 - 它去检查
sequenceBuffer[offset]
的值。 - 如果
sequenceBuffer[offset]
的值等于c + 1
,这就表明生产者已经完成了在c
位置的写入,数据可供消费。 - 消费者获得许可,从
buffer[offset]
读取元素。 - 读取之后,消费者会更新
sequenceBuffer[offset]
的值为c + capacity
(即c + mask + 1
)。这个值将会是下一轮生产者在同一个物理槽位offset
上期望看到的序列号,从而完成一个循环。
- 一个消费者想要消费逻辑索引
总结
ConcurrentSequencedCircularArrayQueue
是一个巧妙的抽象,它在 ConcurrentCircularArrayQueue
的基础上,通过增加一个并行的 sequenceBuffer
,为无锁(Lock-Free)并发队列算法提供了核心机制。
- 关注点分离: 它将数据存储 (
buffer
) 和并发控制 (sequenceBuffer
) 分离开来。 - 奠定基础: 它不实现具体的
offer
和poll
逻辑,而是将sequenceBuffer
初始化好,交由MpmcArrayQueue
、MpscSequencedArrayQueue
等子类去实现具体的、基于序列号检查的入队和出队操作。 - 高性能设计: 使用
Unsafe
和putOrderedLong
等底层技术,旨在最大化吞吐量和最小化延迟。
因此,这个类是 JCTools 中实现高性能多线程队列(尤其是多生产者场景)的关键基石。
sequenceBuffer
:每个槽位的“状态指示器”
可以把 sequenceBuffer
想象成一个与主数据数组 buffer
并行存在的、专门用来标记状态的记分板。buffer
里的每个槽位(slot),都在 sequenceBuffer
的相同位置有一个对应的 long 类型数字,我们称之为 seq
。
这个 seq
的值不是随便写的,它遵循一个严格的协议,用来表示其对应槽位的状态。对于一个位于 i
位置的槽位,它的 seq
值有以下几种含义:
seq == i
: 槽位i
是空的,可以被生产者用来存放一个生产者序号(producerIndex)为i
的元素。seq == i + 1
: 槽位i
已被写入数据,可以被消费者消费。生产者在放入元素后,会将seq
更新为i + 1
,作为“已发布”的信号。seq == i + capacity
: 槽位i
已被消费,并且消费者已经将其标记为“可回收”。消费者取出元素后,会将seq
更新为i + capacity
。这个+ capacity
的操作,相当于把这个槽位的“版本号”推进了一轮,为下一圈的生产者做好了准备。
举个例子: 假设队列容量 capacity
是 8。
- 生产者想在
producerIndex = 0
的位置放东西。它会检查sequenceBuffer[0]
的值是不是0
。 - 如果是,它就把元素放入
buffer[0]
,然后把sequenceBuffer[0]
的值更新为1
。 - 消费者想从
consumerIndex = 0
的位置取东西。它会检查sequenceBuffer[0]
的值是不是0 + 1 = 1
。 - 如果是,它就从
buffer[0]
取出元素,然后把sequenceBuffer[0]
的值更新为0 + 8 = 8
。 - 现在,当生产者跑完一整圈,轮到
producerIndex = 8
时,它会检查sequenceBuffer[0]
(因为8 % 8 == 0
) 的值是不是8
。正好是消费者更新后的值!于是生产者可以安全地重用这个槽位。
这个 seq
就是生产者和消费者之间不见面就能沟通的“信物”。