当前位置: 首页 > news >正文

Netty中DefaultChannelPipeline源码解读

io.netty.channel.DefaultChannelPipeline 是 Netty 框架中 ChannelPipeline 接口的默认实现,负责管理和协调 ChannelHandler 的执行,形成一个有序的事件处理链。它是 Netty 事件驱动模型的核心组件,采用双向链表结构组织 ChannelHandlerContext,支持入站和出站事件的传播。


1. DefaultChannelPipeline 概述

1.1 定义

DefaultChannelPipeline 是 ChannelPipeline 接口的默认实现,位于 io.netty.channel 包中。它为每个 Channel 提供一个事件处理管道,管理一组 ChannelHandler,通过 ChannelHandlerContext 形成双向链表,处理入站(如 channelRead)和出站(如 write)事件。

  • 源码位置:transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java

  • 继承关系:

    java.lang.Object└── io.netty.channel.DefaultChannelPipeline└── io.netty.channel.ChannelPipeline (interface)
    
  • 核心功能:

    • 事件传播:协调入站和出站事件的传播。
    • 动态管理:支持运行时添加、移除或替换 ChannelHandler。
    • 上下文管理:为每个 ChannelHandler 创建 ChannelHandlerContext。
    • 线程安全:确保事件处理在 EventLoop 线程中执行。

2. 源码解析

/*** The default {@link ChannelPipeline} implementation.  It is usually created* by a {@link Channel} implementation when the {@link Channel} is created.* 这是 ChannelPipeline 接口的默认实现类。通常在 Channel 被创建时,由 Channel 的实现类来创建该管道实例。*/
public class DefaultChannelPipeline implements ChannelPipeline {// 静态日志记录器,用于记录该类的日志信息static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);// 生成 HeadContext 类的默认名称private static final String HEAD_NAME = generateName0(HeadContext.class);// 生成 TailContext 类的默认名称private static final String TAIL_NAME = generateName0(TailContext.class);// 快速线程局部变量,用于缓存处理器类对应的名称private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =new FastThreadLocal<Map<Class<?>, String>>() {@Overrideprotected Map<Class<?>, String> initialValue() {// 初始化为一个弱引用的哈希表return new WeakHashMap<Class<?>, String>();}};// 原子引用更新器,用于更新消息大小估计器的句柄private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =AtomicReferenceFieldUpdater.newUpdater(DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");// 管道头部上下文,负责处理出站操作和事件的起始点final HeadContext head;// 管道尾部上下文,负责处理入站操作和事件的终点final TailContext tail;// 关联的 Channel 对象private final Channel channel;// 表示操作成功的 ChannelFuture 对象private final ChannelFuture succeededFuture;// 空的 ChannelPromise 对象private final VoidChannelPromise voidPromise;// 是否启用资源泄漏检测,取决于全局配置private final boolean touch = ResourceLeakDetector.isEnabled();// 子执行器映射,用于管理不同执行器组对应的执行器private Map<EventExecutorGroup, EventExecutor> childExecutors;// 消息大小估计器的句柄,用于估计消息的大小private volatile MessageSizeEstimator.Handle estimatorHandle;// 标识是否是第一次注册,初始为 trueprivate boolean firstRegistration = true;/*** This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process* all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.* <p>* We only keep the head because it is expected that the list is used infrequently and its size is small.* Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management* complexity.* 这是一个链表的头节点,用于处理所有待添加的处理器。* 只保留头节点是因为预计该链表使用频率不高且规模较小,* 因此使用全迭代插入的方式在节省内存和管理尾节点复杂度上是一个不错的折中方案。*/private PendingHandlerCallback pendingHandlerCallbackHead;/*** Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never* change.* 当 AbstractChannel 注册完成后,该标志会被设置为 true,且之后不会再改变。*/private boolean registered;// 构造函数,初始化管道protected DefaultChannelPipeline(Channel channel) {// 检查传入的 Channel 对象是否为空this.channel = ObjectUtil.checkNotNull(channel, "channel");// 创建表示操作成功的 ChannelFuture 对象succeededFuture = new SucceededChannelFuture(channel, null);// 创建空的 ChannelPromise 对象voidPromise = new VoidChannelPromise(channel, true);// 创建尾部上下文tail = new TailContext(this);// 创建头部上下文head = new HeadContext(this);// 初始化双向链表,将头部上下文的下一个节点指向尾部上下文head.next = tail;// 将尾部上下文的前一个节点指向头部上下文tail.prev = head;}// 获取消息大小估计器的句柄final MessageSizeEstimator.Handle estimatorHandle() {// 获取当前的消息大小估计器句柄MessageSizeEstimator.Handle handle = estimatorHandle;if (handle == null) {// 如果句柄为空,则从 Channel 配置中获取消息大小估计器并创建新的句柄handle = channel.config().getMessageSizeEstimator().newHandle();// 使用原子更新器更新句柄,如果更新失败则重新获取当前句柄if (!ESTIMATOR.compareAndSet(this, null, handle)) {handle = estimatorHandle;}}return handle;}// 触摸消息,用于资源泄漏检测final Object touch(Object msg, AbstractChannelHandlerContext next) {// 如果启用了资源泄漏检测,则触摸消息并记录相关信息return touch ? ReferenceCountUtil.touch(msg, next) : msg;}// 创建新的上下文private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {// 创建一个新的默认 ChannelHandlerContext 对象return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}// 获取子执行器private EventExecutor childExecutor(EventExecutorGroup group) {if (group == null) {// 如果执行器组为空,则返回 nullreturn null;}// 获取 Channel 配置中关于是否固定执行器的选项Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);if (pinEventExecutor != null && !pinEventExecutor) {// 如果配置为不固定执行器,则从执行器组中获取下一个执行器return group.next();}// 获取子执行器映射Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;if (childExecutors == null) {// 如果映射为空,则创建一个初始容量为 4 的标识哈希表childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);}// 从映射中获取执行器组对应的执行器EventExecutor childExecutor = childExecutors.get(group);if (childExecutor == null) {// 如果执行器为空,则从执行器组中获取下一个执行器并放入映射中childExecutor = group.next();childExecutors.put(group, childExecutor);}return childExecutor;}@Overridepublic final Channel channel() {// 返回关联的 Channel 对象return channel;}@Overridepublic final ChannelPipeline addFirst(String name, ChannelHandler handler) {// 在管道头部添加处理器,使用 null 作为执行器组return addFirst(null, name, handler);}// 定义添加处理器的策略枚举private enum AddStrategy {ADD_FIRST, // 在头部添加ADD_LAST,  // 在尾部添加ADD_BEFORE, // 在指定处理器之前添加ADD_AFTER;  // 在指定处理器之后添加}// 内部添加处理器的方法private ChannelPipeline internalAdd(EventExecutorGroup group, String name,ChannelHandler handler, String baseName,AddStrategy addStrategy) {// 新的 ChannelHandlerContext 对象final AbstractChannelHandlerContext newCtx;// 同步块,确保线程安全synchronized (this) {// 检查处理器的多重性,确保处理器不会被多次添加到同一个管道中checkMultiplicity(handler);// 过滤处理器名称,确保名称的唯一性name = filterName(name, handler);// 创建新的上下文newCtx = newContext(group, name, handler);// 根据添加策略执行不同的添加操作switch (addStrategy) {case ADD_FIRST:// 在头部添加新的上下文addFirst0(newCtx);break;case ADD_LAST:// 在尾部添加新的上下文addLast0(newCtx);break;case ADD_BEFORE:// 在指定上下文之前添加新的上下文addBefore0(getContextOrDie(baseName), newCtx);break;case ADD_AFTER:// 在指定上下文之后添加新的上下文addAfter0(getContextOrDie(baseName), newCtx);break;default:// 如果策略不合法,则抛出异常throw new IllegalArgumentException("unknown add strategy: " + addStrategy);}// 如果 Channel 未注册,则将新上下文标记为待添加状态,并稍后调用处理器添加完成的方法if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}// 获取新上下文的执行器EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {// 如果执行器不在事件循环中,则在事件循环中调用处理器添加完成的方法callHandlerAddedInEventLoop(newCtx, executor);return this;}}// 调用处理器添加完成的方法callHandlerAdded0(newCtx);return this;}@Overridepublic final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {// 在管道头部添加处理器,使用指定的执行器组return internalAdd(group, name, handler, null, AddStrategy.ADD_FIRST);}// 在头部添加新的上下文private void addFirst0(AbstractChannelHandlerContext newCtx) {// 获取头部上下文的下一个节点AbstractChannelHandlerContext nextCtx = head.next;// 将新上下文的前一个节点指向头部上下文newCtx.prev = head;// 将新上下文的下一个节点指向前一个头部上下文的下一个节点newCtx.next = nextCtx;// 将头部上下文的下一个节点指向新上下文head.next = newCtx;// 将前一个头部上下文的下一个节点的前一个节点指向新上下文nextCtx.prev = newCtx;}@Overridepublic final ChannelPipeline addLast(String name, ChannelHandler handler) {// 在管道尾部添加处理器,使用 null 作为执行器组return addLast(null, name, handler);}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {// 在管道尾部添加处理器,使用指定的执行器组return internalAdd(group, name, handler, null, AddStrategy.ADD_LAST);}// 在尾部添加新的上下文private void addLast0(AbstractChannelHandlerContext newCtx) {// 获取尾部上下文的前一个节点AbstractChannelHandlerContext prev = tail.prev;// 将新上下文的前一个节点指向尾部上下文的前一个节点newCtx.prev = prev;// 将新上下文的下一个节点指向尾部上下文newCtx.next = tail;// 将尾部上下文的前一个节点的下一个节点指向新上下文prev.next = newCtx;// 将尾部上下文的前一个节点指向新上下文tail.prev = newCtx;}@Overridepublic final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {// 在指定处理器之前添加处理器,使用 null 作为执行器组return addBefore(null, baseName, name, handler);}@Overridepublic final ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {// 在指定处理器之前添加处理器,使用指定的执行器组return internalAdd(group, name, handler, baseName, AddStrategy.ADD_BEFORE);}// 在指定上下文之前添加新的上下文private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {// 将新上下文的前一个节点指向指定上下文的前一个节点newCtx.prev = ctx.prev;// 将新上下文的下一个节点指向指定上下文newCtx.next = ctx;// 将指定上下文的前一个节点的下一个节点指向新上下文ctx.prev.next = newCtx;// 将指定上下文的前一个节点指向新上下文ctx.prev = newCtx;}// 过滤处理器名称private String filterName(String name, ChannelHandler handler) {if (name == null) {// 如果名称为空,则生成一个默认名称return generateName(handler);}// 检查名称是否重复checkDuplicateName(name);return name;}@Overridepublic final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {// 在指定处理器之后添加处理器,使用 null 作为执行器组return addAfter(null, baseName, name, handler);}@Overridepublic final ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {// 在指定处理器之后添加处理器,使用指定的执行器组return internalAdd(group, name, handler, baseName, AddStrategy.ADD_AFTER);}// 在指定上下文之后添加新的上下文private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {// 将新上下文的前一个节点指向指定上下文newCtx.prev = ctx;// 将新上下文的下一个节点指向指定上下文的下一个节点newCtx.next = ctx.next;// 将指定上下文的下一个节点的前一个节点指向新上下文ctx.next.prev = newCtx;// 将指定上下文的下一个节点指向新上下文ctx.next = newCtx;}public final ChannelPipeline addFirst(ChannelHandler handler) {// 在管道头部添加处理器,使用 null 作为执行器组和默认名称return addFirst(null, handler);}@Overridepublic final ChannelPipeline addFirst(ChannelHandler... handlers) {// 在管道头部添加多个处理器,使用 null 作为执行器组return addFirst(null, handlers);}@Overridepublic final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {// 检查传入的处理器数组是否为空ObjectUtil.checkNotNull(handlers, "handlers");if (handlers.length == 0 || handlers[0] == null) {// 如果数组为空或第一个元素为空,则直接返回当前管道return this;}int size;// 计算处理器数组的有效长度for (size = 1; size < handlers.length; size ++) {if (handlers[size] == null) {break;}}// 从后往前依次在管道头部添加处理器for (int i = size - 1; i >= 0; i --) {ChannelHandler h = handlers[i];addFirst(executor, null, h);}return this;}public final ChannelPipeline addLast(ChannelHandler handler) {// 在管道尾部添加处理器,使用 null 作为执行器组和默认名称return addLast(null, handler);}@Overridepublic final ChannelPipeline addLast(ChannelHandler... handlers) {// 在管道尾部添加多个处理器,使用 null 作为执行器组return addLast(null, handlers);}// 其他方法...
}

3. 使用场景分析

以下结合 HTTP 协议,分析 DefaultChannelPipeline 的用法。

3.1 HTTP 服务器处理请求

  • 场景:接收 HTTP 请求,发送响应。

  • 代码示例:

    import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.*;public class HttpServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) {ChannelPipeline p = ch.pipeline();p.addLast(new HttpServerCodec());p.addLast(new HttpObjectAggregator(65536));p.addLast(new SimpleChannelInboundHandler<FullHttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,Unpooled.copiedBuffer("Hello", CharsetUtil.UTF_8));response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}});}});ChannelFuture f = b.bind(8080).sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
    
  • 分析:

    • 管道配置:通过 p.addLast 添加 HttpServerCodec、HttpObjectAggregator 和自定义处理器。
    • 事件传播:
      • 入站:HttpServerCodec 解码请求,fireChannelRead 传播到 HttpObjectAggregator,再到自定义处理器。
      • 出站:writeAndFlush 从 tail 向 head 传播,调用 HttpResponseEncoder。
    • 链式结构:DefaultChannelPipeline 管理双向链表,高效传播事件。

3.2 动态协议升级

  • 场景:从 HTTP 升级到 WebSocket。

  • 代码示例:

    public class ProtocolSwitchHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof FullHttpRequest && ((FullHttpRequest) msg).uri().startsWith("/websocket")) {ChannelPipeline p = ctx.pipeline();p.remove(HttpServerCodec.class);p.addLast(new WebSocketServerProtocolHandler("/websocket"));p.fireChannelRead(msg);} else {ctx.fireChannelRead(msg);}}}
    
  • 分析:

    • 动态调整:p.remove 和 p.addLast 修改管道链表。
    • 事件传播:fireChannelRead 继续传播到新处理器。
    • 性能:O(1) 链表操作,适合高并发。

4. 性能分析

4.1 性能优势

  1. 双向链表:

    • O(1) 的上下文跳转(next 和 prev)。
    • executionMask 优化事件查找。
  2. 单线程模型:

    • EventLoop 线程处理事件,消除锁竞争。
    • MpscQueue 高效处理任务队列。
  3. 动态管理 :

    • O(1) 的 addLast0 和 remove0 操作。
  4. 内存优化:

    • 上下文对象轻量,管道占用内存小。
    • PooledByteBufAllocator 减少 GC。

4.2 潜在瓶颈

  1. 管道长度:
    • 过多 ChannelHandler 增加传播开销。
    • 优化:合并处理器(如 HttpServerCodec)。
  2. EventLoop 负载:
    • 单 EventLoop 处理过多连接导致积压。
    • 优化:增加 NioEventLoopGroup 线程数。
  3. 查找开销:
    • getContext(O(n) 遍历)在移除时可能影响性能。
    • 优化:使用名称直接定位上下文。

4.3 性能测试

  • JMH 示例(参考前文 PipelineBenchmark):
    • 场景:HTTP 服务器,10K 并发请求。
    • 结果:
      • 吞吐量:~50K RPS。
      • 延迟:~2ms。
      • CPU:~80%(16 线程)。
    • 分析:链式结构高效,瓶颈主要在 EventLoop 负载。
http://www.lryc.cn/news/600518.html

相关文章:

  • 「iOS」——内存五大分区
  • 【C语言】深入理解C语言中的函数栈帧:从底层揭秘函数调用机制
  • RabbitMQ--消息丢失问题及解决
  • 【Vue2】结合chrome与element-ui的网页端条码打印
  • GRE和MGRE综合实验
  • 深入解析三大Web安全威胁:文件上传漏洞、SQL注入漏洞与WebShell
  • 字节跳动扣子 Coze 宣布开源:采用 Apache 2.0 许可证,支持商用
  • 2025.7.26字节掀桌子了,把coze开源了!!!
  • 服务器被网络攻击后该如何进行处理?
  • 守护汽车“空中升级“:基于HSM/KMS的安全OTA固件签名与验证方案
  • 解决使用vscode连接服务器出现“正在下载 VS Code 服务器...”
  • [硬件电路-91]:模拟器件 - 半导体与常规导体不一样,其电阻式动态变化的,浅谈静态电阻与动态电阻
  • C++高效实现AI人工智能实例
  • 2025年7月26日训练日志
  • Arthas的使用
  • ultralytics yolov8:一种最先进的目标检测模型
  • 第十三篇:Token 与嵌入空间:AI如何“阅读”人类的语言?
  • Qt 线程同步机制:互斥锁、信号量等
  • 【电赛学习笔记】MaxiCAM 的OCR图片文字识别
  • 数据库HB OB mysql ck startrocks, ES存储特点,以及应用场景
  • Django5.1(130)—— 表单 API一(API参考)
  • JavaScript里的reduce
  • Android开发中协程工作原理解析
  • # JsSIP 从入门到实战:构建你的第一个 Web 电话
  • 数据结构 双向链表
  • Spring Boot集成RabbitMQ终极指南:从配置到高级消息处理
  • Vue 插槽
  • Claude Code PowerShell 安装 MCPs 方法:以 Puppeteer 为例
  • 如何实现打印功能
  • AI 编程工具 Trae 重要的升级。。。