当前位置: 首页 > news >正文

Caffeine Cache解析(三):BoundedBuffer 与 MpscGrowableArrayQueue 源码浅析

接续 Caffeine Cache解析(一):接口设计与TinyLFU

接续 Caffeine Cache解析(二):drainStatus多线程状态流转

BoundedBuffer 与 MpscGrowableArrayQueue

multiple-producer / single-consumer

这里multiple和single指的是并发数量

  • BoundedBuffer: Caffeine中readBuffer字段实现 , A circular ring buffer
  • MpscGrowableArrayQueue: Caffeine中writeBuffer字段实现, An MPSC array queue which starts at initialCapacity and grows to maxCapacity in linked chunks of the initial size. The queue grows only when the current buffer is full and elements are not copied on resize, instead a link to the new buffer is stored in the old buffer for the consumer to follow.

BoundedBuffer

BoundedBuffer继承自StripedBuffer,其中stripe表示条、带的意思,StripedBuffer即做了分段处理,
增加写入并发度。StripedBuffer类似一个门面,其中的volatile Buffer<E> @Nullable[] table;
实际存放数据,而这里的Buffer接口实现为BoundedBuffer的内部类RingBuffer。
StripedBuffer实现模仿jdk中的java.util.concurrent.atomic.Striped64,比如java.util.concurrent.atomic.LongAdder extends Striped64的并发累加器,其实现中有多个long的累加器,可以并发累加,最后获取sum的时候将所有累加器sum起来即可。除了支持并发写入,StripedBuffer还避免了扩容时的复制操作。

StripedBuffer的uml和原理示意图如下图:

在这里插入图片描述

刨去预防false sharing 的 pad相关代码,RingBuffer也是通过双指针来对一个固定size的数组进行循环利用,简化格式如下:

static final class RingBuffer<E> extends ... implements Buffer<E> {static final VarHandle BUFFER = MethodHandles.arrayElementVarHandle(Object[].class);static final VarHandle READ WRITE;static {...READ = lookup.findVarHandle(BBHeader.ReadCounterRef.class, "readCounter", long.class);WRITE = lookup.findVarHandle(BBHeader.ReadAndWriteCounterRef.class, "writeCounter", long.class);...}// 一个RingBuffer固定16个元素static final int BUFFER_SIZE = 16;// mask,取RingBuffer数组index用static final int MASK = BUFFER_SIZE - 1;// 不直接使用,通过BUFFER字段调用final Object[] buffer;// 类似于写入指针,但这里通过counter表示,单调递增,且一定大于等于readCounter// 不直接使用,通过WRITE字段调用volatile long writeCounter;// 类似于读取指针,但这里通过counter表示,单调递增,且一定小于等于writeCounter// 不直接使用,通过READ字段调用volatile long readCounter;public RingBuffer(E e) {buffer = new Object[BUFFER_SIZE];// 写buffer[]的第0个indexBUFFER.set(buffer, 0, e);// write计数器+1WRITE.set(this, 1);}@Overridepublic int offer(E e) {long head = readCounter;long tail = writeCounterOpaque();long size = (tail - head);// 环形数组,读写counter>=16即满了if (size >= BUFFER_SIZE) {return Buffer.FULL;}// 没满先cas writeCounter,失败直接返回failif (casWriteCounter(tail, tail + 1)) {// 计算index,// 极端举例,tail = 18, head最小只能是3, // 18 = 10010 和 1111做与 = 0010// 此时会写入到index = 2的位置,因为head=3之前的元素已经被处理过了,可以覆盖,实现循环利用int index = (int) (tail & MASK);BUFFER.setRelease(buffer, index, e);return Buffer.SUCCESS;}return Buffer.FAILED;}@Override@SuppressWarnings("Varifier")public void drainTo(Consumer<E> consumer) {@Var long head = readCounter;long tail = writeCounterOpaque();long size = (tail - head);// 特判if (size == 0) {return;}do {// 读取下一个元素int index = (int) (head & MASK);var e = (E) BUFFER.getAcquire(buffer, index);if (e == null) { // write还没写进去,因为写入是先cas writeCounter再写入元素到数组,所以存在为null的情况// not published yetbreak;}BUFFER.setRelease(buffer, index, null);consumer.accept(e);head++;} while (head != tail);// readerCounter归位setReadCounterOpaque(head);}...}
}

那么StripedBuffer面对并发offer时,如何实现的呢?

abstract class StripedBuffer<E> implements Buffer<E> {@Overridepublic int offer(E e) {// 计算一个hash值h,用于定位此次写入Buffer<E>[]的indexlong z = mix64(Thread.currentThread().getId());int increment = ((int) (z >>> 32)) | 1;int h = (int) z;int mask; // mask,用于根据hash值计算出对应的indexint result; // 结果,0成功 -1失败,1满了BoundedBuffer.RingBuffer<E> buffer;boolean uncontended = true; // 是否存在竞争,即RingBuffer的cas是否成功Buffer<E>[] buffers = table;if ((buffers == null) // 第一次buffers还没初始化,直接走到expandOrRetry|| ((mask = buffers.length - 1) < 0) // 即buffers.length == 0,buffers还没初始化完成|| ((buffer = (BoundedBuffer.RingBuffer<E>) buffers[h & mask]) == null) // 定位到的buffers[]的RingBuffer// 实际执行插入到RingBuffer中,FAILED则expandOrRetry,如果FULL,则直接返回FULL,外层调用会主动调用drainTo|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {// “扩容”或者重试return expandOrRetry(e, h, increment, uncontended);}return result;}/*** resize和create Buffers时cas的标志位*/volatile int tableBusy;// 包含初始化、扩容、创建Bufferfinal int expandOrRetry(E e, int h, int increment, boolean wasUncontended) {int result = Buffer.FAILED;boolean collide = false; // True if last slot nonemptyfor (int attempt = 0; attempt < ATTEMPTS; attempt++) { // 重试3次, 3次是有含义的,往下看Buffer<E>[] buffers;Buffer<E> buffer;int n;if (((buffers = table) != null) && ((n = buffers.length) > 0)) { // buffers初始化了if ((buffer = buffers[(n - 1) & h]) == null) { // 定位RingBuffer如果为nullif ((tableBusy == 0) && casTableBusy()) { // tableBusy标志位设置成功boolean created = false;try { // Recheck under lockBuffer<E>[] rs;int mask;int j;// 再次判断RingBuffer是否为空if (((rs = table) != null) && ((mask = rs.length) > 0)&& (rs[j = (mask - 1) & h] == null)) {// 创建一个RingBuffer,调用BoundedBuffer#creaters[j] = create(e);created = true;}} finally {tableBusy = 0;}if (created) {result = Buffer.SUCCESS;break;}continue; // Slot is now non-empty}collide = false;} else if (!wasUncontended) {// 走到这里只有在StripedBuffer的offer方法的if里的最后buffer#offer失败的情况wasUncontended = true;} else if ((result = buffer.offer(e)) != Buffer.FAILED) { // 【再次重试】// RingBuffer插入SUCCESS或FULL则直接返回break;} else if ((n >= MAXIMUM_TABLE_SIZE) || (table != buffers)) {// buffers已经到最大值或table已经不是最初的buffers(即经过Arrays.copyOf扩容)collide = false; // At max size or stale} else if (!collide) { // 上面【再次重试】还是失败,collide碰撞标记设为truecollide = true;} else if ((tableBusy == 0) && casTableBusy()) {// 走到这里是,先【再次重试】失败,然后到上面的if将collide设为true// 然后下一次循环【再次重试】失败和后面的if判断失败就会走这里实施扩容try {if (table == buffers) {// 扩容table,2倍table = Arrays.copyOf(buffers, n << 1);}} finally {tableBusy = 0;}collide = false;// 扩容后不会重试offer,而是在下一次for循环offer插入,所以重试的ATTEMPTS==3continue; // Retry with expanded table}// hash值变化增加一下h += increment;}// 执行初始化// 如果tableBusy标志位不是1,且cas设置为1成功else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {boolean init = false;try { // Initialize tableif (table == buffers) {Buffer<E>[] rs = new Buffer[1];// create实际调用BoundedBuffer#create方法返回RingBufferrs[0] = create(e);table = rs;init = true;}} finally {tableBusy = 0;}if (init) {result = Buffer.SUCCESS;break;}}}return result;}...
}

MpscGrowableArrayQueue

MpscGrowableArrayQueue优势是在扩容时不需要copy数组,只需要重新分配数组并使用指针链接
先来看下几个名词:

  • producerBuffer: 写入元素数组
  • consumerBuffer: 读取元素数组,当读取及时时可以和producerBuffer是同一个数组
  • producerIndex(pIndex): 写入元素的数量 * 2,为什么*2 ?因为奇数表示扩容中,代码中很多数值都乘以2了,要注意辨别
  • consumerIndex(cIndex): 读取元素的数量 * 2
  • offset: buffer[]中的元素的index,可由 pIndex 和 cIndex 和mask 转化而来 (pIndex & mask) >> 1
  • mask: 掩码,用于计算offset和capacity(实际大小,非乘以2的值),为n位1+末位0,如 6=110,14=1110,30=11110
  • JUMP: 静态标识变量,放入buffer[]中,表示需要到nextBuffer的相同index找元素
  • maxQueueCapacity:数组最大容量 * 2

MpscGrowableArrayQueue的数据结构和扩容方式如下图:

在这里插入图片描述在这里插入图片描述

看下计算规则:

          // initialCapacity为构造函数传入// initialCap -> p2capacity:  6 -> 8, 7 -> 8, 8 -> 8, 9 -> 16int p2capacity = ceilingPowerOfTwo(initialCapacity);// 末位为0的掩码// p2capacity -> mask(二进制表示) : 4 -> 110, 8 -> 1110, 16 -> 11110long mask = (p2capacity - 1L) << 1;// +1的是存放指向nextBuffer[]的指针E[] buffer = allocate(p2capacity + 1);// 扩容buffer[]的长度根据现有buffer[]的长度计算得来int newSize = 2 * (buffer.length - 1) + 1// 注意,buffer[]的length和capacity表示不同的东西// length表示数组的长度// capacity表示数组可存放元素的数量(不含JUMP 和nextBuffer指针)protected long getCurrentBufferCapacity(long mask) {// 根据构造函数中规则可知// mask = (p2capacity - 1L) << 1 = 2 * p2capacity - 2; 这里p2Capacity真实没乘2的容量,是initialCapacity向上取最小的2的n次方// curBufferLength = p2capacity + 1// 又根据扩容规则 nextBufferLength = 2 * (curBufferLength - 1) + 1 = 2 * p2capacity + 1 = (mask + 2) + 1// 所以每次扩容都是把 p2capacity * 2,然后再加一个指针的1// 但是如果  p2capacity * 2 已经达到了 maxQueueCapacity,也就不需要预留向后扩容用的指针了// 直接把原来存放指针的地方用来存放元素,扩大一个容量return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask;}

class MpscGrowableArrayQueue<E> extends ...BaseMpscLinkedArrayQueue... {// 应理解为当前prodce过的元素的 count * 2,可以转化为producerBuffer的索引,通过VarHandle操作protected long producerIndex; protected long producerMask;// 写入bufferprotected E[] producerBuffer;protected volatile long producerLimit;// 应理解为当前consume过的元素的 count * 2,可以转化为consumerBuffer的索引protected long consumerIndex; protected long consumerMask;// 读取buffer,当消费及时时可以和producerBuffer是同一个数组protected E[] consumerBuffer; // 表示获取nextBuffer的相同index的元素private static final Object JUMP = new Object();protected final long maxQueueCapacity;...
}
abstract class BaseMpscLinkedArrayQueue<E> ... {...BaseMpscLinkedArrayQueue(int initialCapacity) {if (initialCapacity < 2) {throw new IllegalArgumentException("Initial capacity must be 2 or more");}//  6 -> 8, 7 -> 8, 8 -> 8, 9 -> 16int p2capacity = ceilingPowerOfTwo(initialCapacity);// leave lower bit of mask clear// 8 = 1000 -> 1000 - 1 = 0111 -> 0111 << 1 = 1110 = 14// 0000 0000 0000 00long mask = (p2capacity - 1L) << 1;// need extra element to point at next arrayE[] buffer = allocate(p2capacity + 1);producerBuffer = buffer;producerMask = mask;consumerBuffer = buffer;consumerMask = mask;soProducerLimit(this, mask); // we know it's all empty to start with}@Overridepublic boolean offer(E e) {if (e == null) {throw new NullPointerException();}long mask; // mask 即 当前buffer的sizeE[] buffer;// pIndex表示两个含义:// 1. 使用最后一位表示是否在扩容,最后一位为1(奇数)表示在扩容// 2. 使用pIndex >> 2 即 pIndex/2 表示最新写入元素在buffer数组中的索引,后面代码称为offsetlong pIndex;while (true) {// lv表示 volatile load (load + LoadLoad barrier)long producerLimit = lvProducerLimit();pIndex = lvProducerIndex(this);// lower bit is indicative of resize, if we see it we spin until it's cleared// 奇数表示正在扩容,自旋if ((pIndex & 1) == 1) {continue;}// mask/buffer may get changed by resizing -> only use for array access after successful CAS.mask = this.producerMask;buffer = this.producerBuffer;// a successful CAS ties the ordering, lv(pIndex)-[mask/buffer]->cas(pIndex)// 这里快速判断是否需要扩容,不需要再直接写入元素到现有buffer[]if (producerLimit <= pIndex) {// 内层int result = offerSlowPath(mask, pIndex, producerLimit);switch (result) {case 0:break;case 1:continue;case 2:return false;case 3:resize(mask, buffer, pIndex, e);return true;}}// 先cas更新pIndexif (casProducerIndex(this, pIndex, pIndex + 2)) {break;}}// cas更新pIndex成功后再设置元素,这里的offset是才是数组的索引long offset = modifiedCalcElementOffset(pIndex, mask);// so表示set offsetsoElement(buffer, offset, e);return true;}/*** We do not inline resize into this method because we do not resize on fill.*/// 此时pIndex <= producerLimitprivate int offerSlowPath(long mask, long pIndex, long producerLimit) {int result;long cIndex = lvConsumerIndex(this);long bufferCapacity = getCurrentBufferCapacity(mask);result = 0; // 0 - goto pIndex CASif (cIndex + bufferCapacity > pIndex) {if (!casProducerLimit(this, producerLimit, cIndex + bufferCapacity)) {// 1表示重试result = 1;}}// pIndex和cIndex相差超过maxQueueCapacity了,即满了// 注意这里maxQueueCapacity是2倍的bufferCapacity,即maxQueueCapacity = 2 * bufferCapacity,和pIndex逻辑一样else if (availableInQueue(pIndex, cIndex) <= 0) {// 2表示失败result = 2;}// pIndex设置为奇数,表示正在扩容else if (casProducerIndex(this, pIndex, pIndex + 1)) {// 扩容result = 3;} else {// 扩容失败,重试result = 1;}return result;}@Overrideprotected long getCurrentBufferCapacity(long mask) {// 根据构造函数中规则可知// mask = (p2capacity - 1L) << 1 = 2 * p2capacity - 2; 这里p2Capacity真实没乘2的容量,是initialCapacity向上取最小的2的n次方// curBufferLength = p2capacity + 1// 又根据扩容规则 nextBufferLength = 2 * (curBufferLength - 1) + 1 = 2 * p2capacity + 1 = (mask + 2) + 1// 所以每次扩容都是把 p2capacity * 2,然后再加一个指针的1// 但是如果  p2capacity * 2 已经达到了 maxQueueCapacity,也就不需要预留向后扩容用的指针了// 直接把原来存放指针的地方用来存放元素,扩大一个容量return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask;}protected long availableInQueue(long pIndex, long cIndex) {return maxQueueCapacity - (pIndex - cIndex);}/*** poll只能单线程处理*/@Override@SuppressWarnings({"CastCanBeRemovedNarrowingVariableType", "unchecked"})public E poll() {E[] buffer = consumerBuffer;long index = consumerIndex;long mask = consumerMask;long offset = modifiedCalcElementOffset(index, mask);Object e = lvElement(buffer, offset);// LoadLoadif (e == null) {if (index != lvProducerIndex(this)) {// e当且仅当queue是空的// 但仅仅通过 e==null 不能表示queue为空,得看producerIndex和consumerIndex的关系// consumerIndex != producerIndex 且 e == null 说明有producer正在插入(插入是先cas pIndex再插入元素)// 自旋do {e = lvElement(buffer, offset);} while (e == null);} else {// 此时consumerIndex == producerIndex,说明都poll了,队列为空return null;}}if (e == JUMP) {// JUMP到链接的下一个bufferE[] nextBuffer = getNextBuffer(buffer, mask);// 取array中相同的index的元素,并更新元素和consumerIndexreturn newBufferPoll(nextBuffer, index);}// 更新元素soElement(buffer, offset, null);// 更新consumerIndexsoConsumerIndex(this, index + 2);return (E) e;}/*** This method assumes index is actually (index << 1) because lower bit is used for resize. This* is compensated for by reducing the element shift. The computation is constant folded, so* there's no cost.*/// index = 2 -> offset = 1, index = 4 -> offset = 2static long modifiedCalcElementOffset(long index, long mask) {return (index & mask) >> 1;}private void resize(long oldMask, E[] oldBuffer, long pIndex, E e) {// 扩容规则int newBufferLength = getNextBufferSize(oldBuffer);E[] newBuffer = allocate(newBufferLength);producerBuffer = newBuffer;int newMask = (newBufferLength - 2) << 1;producerMask = newMask;// 计算buffer[]中的index,这里叫offsetlong offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);long offsetInNew = modifiedCalcElementOffset(pIndex, newMask);// set 元素soElement(newBuffer, offsetInNew, e);// element in new array// set newBuffer[]指针soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked...// Invalidate racing CASs// We never set the limit beyond the bounds of a buffersoProducerLimit(this, pIndex + Math.min(newMask, availableInQueue));// make resize visible to the other producerssoProducerIndex(this, pIndex + 2);// INDEX visible before ELEMENT, consistent with consumer expectation// make resize visible to consumersoElement(oldBuffer, offsetInOld, JUMP);}// 扩容规则@Overrideprotected int getNextBufferSize(E[] buffer) {long maxSize = maxQueueCapacity / 2;// maxQueueCapacity是实际capacity的2倍,所以这里buffer.length 肯定不大于 maxSizeif (buffer.length > maxSize) {throw new IllegalStateException();}int newSize = 2 * (buffer.length - 1);return newSize + 1;}}
http://www.lryc.cn/news/479119.html

相关文章:

  • 全双工通信协议WebSocket——使用WebSocket实现智能学习助手/聊天室功能
  • Rust-Trait 特征编程
  • 彻底理解哈希表(HashTable)结构
  • 微信小程序的汽车维修预约管理系统
  • LeetCode:3255. 长度为 K 的子数组的能量值 II(模拟 Java)
  • 深入了解逻辑回归:机器学习中的经典算法
  • 软件测试基础十三(python 函数)
  • 计算机网络——HTTP篇
  • 信息化运维方案,实施方案,开发方案,信息中心安全运维资料(软件资料word)
  • 自动化工具 Gulp
  • css实现div被图片撑开
  • Power Pivot、Power BI 和 SQL Server Analysis Services 的公式语言:DAX(数据分析表达式)
  • 大模型应用编排工具Dify二开之工具和模型页面改造
  • Pytorch用BERT对CoLA、新闻组文本数据集自然语言处理NLP:主题分类建模微调可视化分析...
  • LightGBM-GPU不能装在WSL,能装在windows上
  • 工业相机常用功能之白平衡及C++代码分享
  • Foundry 单元测试
  • idea database连接数据库后看不到表解决方法、格式化sql快捷键
  • 【数学二】线性代数-向量-向量组的秩、矩阵得秩
  • ABAP开发-内存管理
  • 【Ajax】跨域
  • yii 常用一些调用
  • 网页版五子棋——用户模块(服务器开发)
  • 以RK3568为例,ARM核心板如何实现NTP精准时间同步?
  • Twitter(X)2024最新注册教程
  • 10.桥接模式设计思想
  • Java多线程详解⑤(全程干货!!!)线程安全问题 || 锁 || synchronized
  • (已解决)Dependency “ ” not found 细谈
  • 网络编程、UDP、TCP、三次握手、四次挥手
  • 程序员的生活周刊 #7:耐克总裁被裁记