Netty中InternalThreadLocalMap的作用
io.netty.util.internal.InternalThreadLocalMap
是 Netty 内部工具类,用于管理高效的线程局部存储(Thread-Local Storage)。它替代了 Java 标准库的 ThreadLocal
,为 Netty 的高性能异步框架提供了优化的线程局部变量管理机制。该类在 Netty 的运行时环境中扮演关键角色,特别是在事件循环(EventLoop
)、对象池(Recycler
)、随机数生成和其他性能敏感场景中。
- 作用:
- 提供高效的线程局部存储,管理每个线程的独立数据(如缓存、计数器、上下文)。
- 优化
ThreadLocal
的性能,减少内存分配和访问开销。 - 支持 Netty 的核心功能模块,如
FastThreadLocal
、对象池(Recycler
)、随机数生成(ThreadLocalRandom
)、ChannelHandler
共享性检查等。
源码注释:
/*** InternalThreadLocalMap 是 Netty 的线程局部存储管理类,用于高效存储线程特定数据。* 每个线程拥有一个唯一的 InternalThreadLocalMap 实例,通过 ThreadLocal 或 FastThreadLocalThread 访问。* 支持 FastThreadLocal、对象池、随机数生成等功能。*/
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {/*** 慢路径的 ThreadLocal,用于普通线程存储 InternalThreadLocalMap。*/private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();/*** 用于分配线程局部变量的索引,递增生成唯一索引。*/private static final AtomicInteger nextIndex = new AtomicInteger();/*** 表示未设置的占位符对象。用于区分未设置值和值为null*/public static final Object UNSET = new Object();/*** 存储线程局部变量的数组,使用索引访问。*/private Object[] indexedVariables;// 核心字段//在 Netty 中,Future 对象可以添加多个监听器(FutureListener),当 Future 完成时,会通知所有添加的监听器。如果在监听器的 operationComplete 方法 中又触发了另一个 Future 的完成,并且这个新的 Future 也有监听器,就会形成递归调用。如果递归深度过深,就可能导致 StackOverflowError。//futureListenerStackDepth 字段用于记录当前线程中 Future 监听器通知的递归深度。Netty 通过配置一个最大递归深度(通过系统属性 io.netty.defaultPromise.maxListenerStackDepth 配置,默认值为 8),当递归深度超过这个最大值时,Netty 会将后续的监听器通知任务放到事件循环中异步执行,而不是继续在当前栈中递归调用,从而避免栈溢出。private int futureListenerStackDepth;//在 Netty 的本地传输中,当一个 LocalChannel 读取数据时,可能会触发一系列的操作,这些操作可能会递归调用读取逻辑。如果递归深度过深,就可能导致 StackOverflowError。localChannelReaderStackDepth 字段可以记录当前线程中 LocalChannel 读取操作的递归深度,当递归深度超过一定阈值时,可以采取相应的措施来避免栈溢出。private int localChannelReaderStackDepth; // Channel 读取栈深度//handlerSharableCache 把类和其对应的是否可共享的布尔值映射起来。当首次判断某个 ChannelHandler 类是否可共享时,会进行反射操作并把结果存入缓存;后续再判断同一类时,直接从缓存获取结果,无需再次反射,提高了判断效率。private Map<Class<?>, Boolean> handlerSharableCache; // ChannelHandler 共享缓存private IntegerHolder counterHashCode; // 计数器哈希码private ThreadLocalRandom random; // 线程局部随机数生成器//TypeParameterMatcher 用于判断一个对象是否是特定类型或其子类型,常用于处理消息类型匹配的场景。当需要判断某个对象是否为特定类型时,会使用 TypeParameterMatcher 来进行匹配。private Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache; // 类型匹配缓存private Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache; // 类型查找缓存/*** 获取当前线程的 InternalThreadLocalMap 实例。* @return 当前线程的 InternalThreadLocalMap*/public static InternalThreadLocalMap get() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) {return fastGet((FastThreadLocalThread) thread);} else {return slowGet();}}/*** 适用于 FastThreadLocalThread 类型的线程。FastThreadLocalThread 是 Netty 自定义的线程类,它内部维护了一个 InternalThreadLocalMap 实 * 例,通过 threadLocalMap() 方法可以直接获取该实例。*/private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();if (threadLocalMap == null) {thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());}return threadLocalMap;}/*** 适用于普通的 Thread 类型的线程。对于普通线程,没有内置的 InternalThreadLocalMap 实例,因此需要使用 Java 标准的 ThreadLocal 来存储和获取 * InternalThreadLocalMap 实例。*/private static InternalThreadLocalMap slowGet() {InternalThreadLocalMap ret = slowThreadLocalMap.get();if (ret == null) {ret = new InternalThreadLocalMap();slowThreadLocalMap.set(ret);}return ret;}/*** 构造函数,初始化 indexedVariables 数组。*/private InternalThreadLocalMap() {super(newIndexedVariableTable());}/*** 创建初始大小为 32 的线程局部变量数组,填充 UNSET。* @return 初始化后的数组*/private static Object[] newIndexedVariableTable() {Object[] array = new Object[32];Arrays.fill(array, UNSET);return array;}/*** 分配一个新的线程局部变量索引。* @return 唯一索引* @throws IllegalStateException 如果索引溢出*/public static int nextVariableIndex() {int index = nextIndex.getAndIncrement();if (index < 0) {nextIndex.decrementAndGet();throw new IllegalStateException("too many thread-local indexed variables");}return index;}/*** 获取指定索引的线程局部变量值。* @param index 变量索引* @return 变量值,或 UNSET 如果未设置*/public Object indexedVariable(int index) {Object[] lookup = indexedVariables;return index < lookup.length ? lookup[index] : UNSET;}/*** 设置指定索引的线程局部变量值。* @param index 变量索引* @param value 要设置的值* @return true 如果是首次设置,false 如果覆盖已有值*/public boolean setIndexedVariable(int index, Object value) {Object[] lookup = indexedVariables;if (index < lookup.length) {Object oldValue = lookup[index];lookup[index] = value;return oldValue == UNSET;} else {expandIndexedVariableTableAndSet(index, value);return true;}}/*** 扩展 indexedVariables 数组以支持更大的索引,并设置值。* @param index 变量索引* @param value 要设置的值*/private void expandIndexedVariableTableAndSet(int index, Object value) {Object[] oldArray = indexedVariables;int oldCapacity = oldArray.length;int newCapacity = index;newCapacity |= newCapacity >>> 1;newCapacity |= newCapacity >>> 2;newCapacity |= newCapacity >>> 4;newCapacity |= newCapacity >>> 8;newCapacity |= newCapacity >>> 16;newCapacity++;Object[] newArray = Arrays.copyOf(oldArray, newCapacity);Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);newArray[index] = value;indexedVariables = newArray;}
}
3. 作用与功能
InternalThreadLocalMap
是 Netty 的线程局部存储核心,其主要作用包括:
-
高效线程局部存储:
- 使用数组(
indexedVariables
)存储线程局部变量,通过索引访问,性能优于 JavaThreadLocal
的HashMap
实现。 - 支持
FastThreadLocal
,Netty 的高性能ThreadLocal
替代品,通过nextVariableIndex
分配唯一索引。
- 使用数组(
-
支持 Netty 功能模块:
- 对象池(Recycler):存储对象池的上下文数据,减少对象分配开销(如
ByteBuf
缓存)。 - 随机数生成:
random
字段提供线程局部的ThreadLocalRandom
实例。 - ChannelHandler 共享性:
handlerSharableCache
缓存@Sharable
状态,优化ChannelPipeline
处理。 - 类型匹配缓存:
typeParameterMatcherGetCache
和typeParameterMatcherFindCache
减少反射开销。 - 栈深度跟踪:
futureListenerStackDepth
和localChannelReaderStackDepth
防止递归调用溢出。
- 对象池(Recycler):存储对象池的上下文数据,减少对象分配开销(如
-
性能优化:
- 快速路径:对于
FastThreadLocalThread
(Netty 自定义线程类),直接存储和访问InternalThreadLocalMap
,避免ThreadLocal
的开销。 - 慢路径:对于普通线程,使用
ThreadLocal
存储,保持兼容性。 - 数组存储:初始大小为 32 的数组,动态扩展,减少哈希操作。
- 快速路径:对于
-
线程隔离:
- 确保每个线程的
InternalThreadLocalMap
独立,避免数据干扰。 - 特别适合 Netty 的单线程
EventLoop
模型。
- 确保每个线程的
4. 关键方法解析
以下是 InternalThreadLocalMap
的核心方法及其作用:
4.1 get
- 签名:
public static InternalThreadLocalMap get()
- 作用:获取当前线程的
InternalThreadLocalMap
实例。 - 逻辑:
- 检查线程是否为
FastThreadLocalThread
:- 如果是,调用
fastGet
直接访问。 - 否则,调用
slowGet
使用ThreadLocal
。
- 如果是,调用
- 如果实例不存在,创建并存储。
- 检查线程是否为
- 使用场景:
FastThreadLocal
和其他模块通过get
获取线程局部存储。
4.2 nextVariableIndex
- 签名:
public static int nextVariableIndex()
- 作用:分配唯一索引,用于
FastThreadLocal
变量存储。 - 逻辑:
- 使用
AtomicInteger
递增生成索引。 - 防止索引溢出(负数时抛异常)。
- 使用
4.3 indexedVariable
- 签名:
public Object indexedVariable(int index)
- 作用:获取指定索引的线程局部变量值。
- 逻辑:
- 如果索引在数组范围内,返回值;否则返回
UNSET
。
- 如果索引在数组范围内,返回值;否则返回
4.4 setIndexedVariable
- 签名:
public boolean setIndexedVariable(int index, Object value)
- 作用:设置指定索引的线程局部变量值。
- 逻辑:
- 如果索引在数组范围内,设置值并返回是否首次设置。
- 如果索引超出范围,扩展数组并设置值。
4.5 expandIndexedVariableTableAndSet
- 签名:
private void expandIndexedVariableTableAndSet(int index, Object value)
- 作用:扩展
indexedVariables
数组以支持更大索引。 - 逻辑:
- 使用位运算计算新容量(接近 2 的幂)。
- 复制旧数组,填充新部分为
UNSET
,设置指定索引的值。
5. 与 EventLoop
的关系
InternalThreadLocalMap
在 Netty 的事件循环(EventLoop
)模型中发挥重要作用:
-
线程隔离:
- 每个
EventLoop
(如NioEventLoop
)绑定一个线程,InternalThreadLocalMap
为该线程提供独立的存储空间。 - 示例:
NioEventLoop
使用InternalThreadLocalMap
存储事件循环的上下文数据。
- 每个
-
性能优化:
EventLoop
线程通常是FastThreadLocalThread
,通过fastGet
直接访问InternalThreadLocalMap
,减少开销。- 示例:在
NioEventLoop#run
方法中,FastThreadLocal
可能用于存储临时状态。
-
支持异步操作:
InternalThreadLocalMap
的字段(如futureListenerStackDepth
)用于跟踪ChannelFuture
监听器的调用栈深度,防止递归溢出。- 示例:在
ChannelPipeline
的事件处理中,handlerSharableCache
优化ChannelHandler
的共享性检查。
-
对象池支持:
EventLoop
线程使用InternalThreadLocalMap
管理对象池(如Recycler
),提高ByteBuf
等对象的复用效率。
6. 使用场景
InternalThreadLocalMap
在 Netty 中的具体应用包括:
-
FastThreadLocal 存储:
- 支持
FastThreadLocal
,Netty 的高性能ThreadLocal
实现。 - 示例:
ByteBuf
的Recycler
使用FastThreadLocal
存储对象池上下文。
- 支持
-
对象池(Recycler):
- 管理线程局部的对象池实例,减少内存分配。
- 示例:
PooledByteBufAllocator
使用InternalThreadLocalMap
缓存ByteBuf
。
-
随机数生成:
random
字段提供线程局部的ThreadLocalRandom
实例。- 示例:负载均衡或随机选择
EventLoop
。
-
ChannelHandler 共享性:
handlerSharableCache
缓存ChannelHandler
的@Sharable
状态。- 示例:
ChannelPipeline
检查ChannelHandler
是否可共享。
-
类型匹配优化:
typeParameterMatcherGetCache
和typeParameterMatcherFindCache
缓存类型匹配结果,减少反射开销。- 示例:
ChannelHandler
的类型检查。