JUC学习笔记-----LongAdder
LongAdder
关键域
public class LongAdder extends Striped64 implements Serializable {// 1. 累加单元数组(懒加载,竞争激烈时扩容)transient volatile Cell[] cells;// 2. 基础值(无竞争时直接累加,避免创建 Cell 数组的开销)transient volatile long base;// 3. cells 初始化/扩容的锁标记(0 = 未加锁;1 = 加锁中)transient volatile int cellsBusy;// Cell 内部类(分段累加的单元,避免竞争)static final class Cell {volatile long value; // 每个 Cell 的累加值(用 volatile 保证可见性)Cell(long x) { value = x; }}
}
1. cells
字段
transient volatile Cell[] cells;
实现分段累加(类似 ConcurrentHashMap
的分段锁思想 ),将竞争分散到不同 Cell
,降低锁竞争。
- 竞争激烈时,
cells
数组会扩容(默认 2 倍),每个线程哈希到不同Cell
累加,避免线程间直接竞争。 transient
:反序列化时重新初始化,避免持久化cells
状态(无意义)。volatile
:保证cells
数组的可见性,线程创建 / 扩容时能感知最新数组。
2. base
字段
transient volatile long base;
优化无竞争场景的性能,避免创建 cells
数组的开销。
- 当线程竞争小(或无竞争)时,直接通过
CAS
累加base
,无需操作cells
。 transient
:反序列化时重新计算base
(依赖cells
合并结果,无需持久化 )。volatile
:保证base
的可见性,多线程累加 / 读取时无歧义。
3. cellsBusy
字段
transient volatile int cellsBusy;
实现自旋锁,保证 cells
数组初始化 / 扩容时的线程安全。
- 0:表示
cells
可初始化 / 扩容。 - 1:表示
cells
正在初始化 / 扩容,其他线程需自旋等待。 - 类似
ReentrantLock
的非公平锁,通过CAS
尝试修改cellsBusy
实现加锁,避免重量级锁开销。
4. Cell
内部类
static final class Cell {volatile long value;Cell(long x) { value = x; }
}
- 每个
Cell
是一个独立的累加单元,线程哈希到某个Cell
后,直接修改Cell.value
,避免线程间竞争。 volatile long value
:保证value
的可见性,多线程修改同一Cell
时(极端情况),仍能通过CAS
保证原子性。- 分段累加的最小单位,
cells
数组扩容时,Cell
数量增加,进一步分散竞争。
伪共享
其中Cell即为累加单元
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {volatile long value;Cell(long x) {value = x;}// 最重要的方法,用来 cas 方式进行累加,prev 表示旧值,next 表示新值final boolean cas(long prev, long next) {return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);}// 省略不重要代码// 这里需要注意,实际源码中还会有 valueOffset 的静态初始化等,如下(补充完整关键部分)private static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}
}
缓存与内存的速度比较
因为 CPU 与内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
Cell
数组连续存储,一个缓存行(64B)可存 2 个Cell
(每个 24B )。- 不同 CPU 核心(Core-0/Core-1 )修改相邻
Cell
时,会因缓存行共享,导致对方缓存行失效,触发频繁缓存同步,降低性能。 @sun.misc.Contended
作用:- 在注解修饰的对象(如
Cell
)前后插入 128B 的 padding(填充),让每个Cell
独占缓存行。 - 避免 “修改一个
Cell
导致相邻Cell
所在缓存行失效” 的问题,提升多线程累加(如LongAdder
)的性能。
- 在注解修饰的对象(如
add
public void add(long x) {Cell[] as; long b, v; int m; Cell a; // 分支 1:无竞争(直接累加 base)或 轻度竞争(尝试 CAS 累加 Cell)if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true; // 分支 2:检查 cells 数组是否可用 + 哈希到具体 Cellif (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || // 核心:尝试 CAS 累加当前 Cell 的 value!(uncontended = a.cas(v = a.value, v + x))) { // 冲突升级:调用 longAccumulate 处理(扩容 cells/重试 CAS)longAccumulate(x, null, uncontended); }}
}
优先无锁累加:
尝试用CAS
直接累加base
→ 无竞争时性能最优。降级分段累加:
累加base
失败(竞争发生),尝试在cells
数组中哈希到Cell
,用CAS
累加 → 分散竞争。冲突升级处理:
若Cell
的CAS
仍失败(重度竞争 ),调用longAccumulate
处理:- 初始化
cells
(首次竞争 )。 - 扩容
cells
数组(竞争持续 )。 - 重试
CAS
或切换Cell
→ 最终保证累加成功。
- 初始化
longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {int h; // 初始化线程哈希值(用于分配 Cell)if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // 强制初始化探针h = getProbe(); wasUncontended = true; }boolean collide = false; // 自旋重试,直到累加成功for (;;) { Cell[] as; Cell a; int n; long v; // 分支 1:cells 数组可用(已初始化)if ((as = cells) != null && (n = as.length) > 0) { // 哈希到具体 Cellif ((a = as[(n - 1) & h]) == null) { // Cell 为空,尝试加锁初始化 Cellif (cellsBusy == 0 && casCellsBusy()) { try { if (cells != as) continue; // 检查 cells 是否被修改// 初始化空 Cellas[(n - 1) & h] = new Cell(x); } finally {cellsBusy = 0; // 解锁}continue; // 重试累加}collide = false; // 标记未冲突} // 之前累加 Cell 失败(wasUncontended = false)else if (!wasUncontended) { wasUncontended = true; h = getProbe(); // 重新哈希,换 Cell 重试} // 尝试 CAS 累加当前 Cellelse if (fn == null ? a.cas(v = a.value, v + x) : // 无函数,直接累加a.cas(v = a.value, fn.applyAsLong(v, x))) { // 有函数,应用函数break; // 累加成功,退出循环} // cells 长度小于最大容量(CPU 核心数),允许扩容else if (n >= NCPU || !cellsEquals(as, cells)) { collide = false; } // 标记冲突,准备扩容else if (!collide) { collide = true; } // 冲突持续,尝试扩容 cellselse if (cellsBusy == 0 && casCellsBusy()) { try {if (cells == as) { // 扩容为 2 倍cells = Arrays.copyOf(as, n << 1); }} finally {cellsBusy = 0; // 解锁}collide = false; continue; // 扩容后重试}h = advanceProbe(h); // 哈希值自增,换 Cell 重试} // 分支 2:cells 未初始化,尝试加锁初始化else if (cellsBusy == 0 && cells == as && casCellsBusy()) { try {if (cells != as) continue; // 检查 cells 是否被修改// 初始化 cells 数组(长度为 2)cells = new Cell[2]; // 哈希到第一个 Cell,初始化值cells[(n = 1) & h] = new Cell(x); } finally {cellsBusy = 0; // 解锁}continue; // 重试累加} // 分支 3:cells 不可用,尝试累加 baseelse if (fn == null ? casBase(v = base, v + x) : // 无函数,直接累加 basecasBase(v = base, fn.applyAsLong(v, x))) { // 有函数,应用函数break; // 累加成功,退出循环} }
}
cells数组不存在情况
检查初始化条件:
确认cells
未初始化且无其他线程持有cellsBusy
锁(cellsBusy == 0
),同时验证cells
引用未被修改(cells == as
)。尝试加锁:
通过casCellsBusy()
尝试将cellsBusy
从 0 置为 1,获取初始化cells
的权限。二次校验:
加锁后再次检查cells
引用是否变化(防止并发修改),若变化则放弃本次初始化,重新进入循环。初始化
cells
数组:
创建长度为 2 的cells
数组,根据线程哈希值(h
)计算索引,初始化对应位置的Cell
为x(1)
。释放锁:
将cellsBusy
重置为 0,释放初始化权限。重试累加流程:
初始化完成后,通过continue
重新进入循环,尝试在新创建的cells
数组中完成累加。降级处理(加锁失败时):
若加锁失败(其他线程正在初始化cells
),则尝试直接累加base
,成功则退出循环,失败则重新进入循环重试。
数组创建好了,但是线程对应的累加单元还没创建情况
- 定位目标 Cell:通过线程哈希值与数组长度取模,定位到线程对应的 Cell 位置。
- 检查 Cell 状态:发现目标 Cell 为 null(未创建)。
- 尝试加锁:检查 cellsBusy 为 0 时,通过 casCellsBusy () 加锁,获取 Cell 创建权限。
- 二次校验:加锁后再次确认 cells 数组未被修改,避免并发冲突。
- 创建 Cell:在目标位置初始化 Cell 并赋值。
- 释放锁:将 cellsBusy 重置为 0,释放权限。
- 重试累加:通过 continue 重新进入循环,尝试在新创建的 Cell 上完成累加。
- 冲突处理:若加锁失败(其他线程操作),标记无冲突后重新哈希换 Cell 重试。
cells存在&cell已创建情况
- 定位目标 Cell:通过线程哈希值与 cells 数组长度取模,定位到已创建的 Cell。
- 首次冲突标记重置:若之前累加失败(wasUncontended 为 false),重置标记并更新线程哈希值换 Cell 重试。
- 尝试 CAS 累加:对当前 Cell 的 value 执行 CAS 累加操作,成功则退出循环。
- 判断扩容条件:若 cells 数组长度≥CPU 核心数或数组已被修改,标记无需扩容。
- 标记冲突状态:首次冲突时标记 collide 为 true,为后续扩容做准备。
- 执行扩容操作:冲突持续且获取到 cellsBusy 锁时,将 cells 数组扩容为原来的 2 倍,释放锁后重试。
- 更新哈希重试:通过 advanceProbe 更新线程哈希值,切换到其他 Cell 重新尝试累加。
sum
public long sum() {Cell[] as = cells; // 引用当前 cells 数组Cell a; long sum = base; // 初始化为 base 的值// cells 数组已初始化(存在分段累加)if (as != null) { // 遍历所有 Cellfor (int i = 0; i < as.length; ++i) { // 跳过空 Cell(可能未初始化)if ((a = as[i]) != null) { sum += a.value; // 累加 Cell 的值}}}return sum; // 返回合并后的总和
}
初始化总和:
将sum
初始化为base
的值 → 包含无竞争场景的累加结果。合并
cells
数组:
遍历cells
数组,累加所有非空Cell
的值 → 包含分段竞争场景的累加结果。返回最终结果:
返回base + cells
合并后的总和 → 保证最终一致性(可能非实时,但结果正确 )。