当前位置: 首页 > news >正文

Netty 的 Select/Poll 机制核心实现主要在 NioEventLoop 的事件循环

Netty 的 Select/Poll 机制核心实现主要在 NioEventLoop 的事件循环中,其设计结合了 I/O 事件处理、任务调度和资源管理。以下是关键机制分析:


1. 事件循环核心流程 (run())

java

for (;;) {int strategy = selectStrategy.calculateStrategy(...);switch (strategy) {case SelectStrategy.SELECT:long deadline = nextScheduledTaskDeadlineNanos();if (!hasTasks()) {strategy = select(deadline); // 阻塞式 Select}nextWakeupNanos.lazySet(AWAKE); // 重置唤醒状态break;// ... 其他策略处理}// I/O 事件与任务处理if (ioRatio == 100) {processSelectedKeys();   // 处理 I/O 事件runAllTasks();           // 运行所有任务} else {long ioStartTime = System.nanoTime();processSelectedKeys();long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按比例运行任务}// 异常与关闭处理if (isShuttingDown()) {closeAll();if (confirmShutdown()) return;}
}
  • Select 策略

    • SELECT:无任务时阻塞 Select,超时时间取最近定时任务的截止时间(deadline),避免错过定时任务。

    • BUSY_WAIT:Netty 未实现(NIO 不支持)。

    • CONTINUE:跳过本次循环(内部状态触发)。

  • I/O 与任务平衡

    • ioRatio 控制 I/O 事件与任务执行的时间比例(默认 50)。

    • 若 ioRatio=100,先处理所有 I/O 事件再处理任务。

    • 否则按比例分配任务执行时间:任务时间 = I/O 时间 * (100 - ioRatio) / ioRatio


2. Select 优化机制

2.1 唤醒与阻塞控制
  • nextWakeupNanos:记录下一次唤醒时间,用于避免无效的 Selector.wakeup() 调用。

  • 阻塞释放:Select 结束后设置 nextWakeupNanos=AWAKE,标识线程已唤醒。

2.2 重建 Selector

java

catch (IOException e) {rebuildSelector0(); // 重建 SelectorselectCnt = 0;continue;
}
  • 当捕获 IOException 时(如 JDK Selector 空轮询 Bug),重建 Selector 并迁移 Channel。

2.3 规避无效唤醒

java

if (unexpectedSelectorWakeup(selectCnt)) {selectCnt = 0; // 重置连续 Select 次数
}
  • selectCnt:记录连续 Select 返回的次数。

  • MIN_PREMATURE_SELECTOR_RETURNS:阈值(默认 256),超过则判定为无效唤醒,可能触发 Selector 重建。


3. I/O 事件处理 (processSelectedKeys)

3.1 优化 SelectedKeys 遍历

java

private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null; // 显式置空加速 GCObject a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);}// ... 处理 NioTask}
}
  • 优化点:使用数组 (SelectedSelectionKeySet) 替代 JDK 的 HashSet,减少迭代器开销。

  • 显式置空:避免 Channel 关闭后内存滞留(Netty 修复 JDK 内存泄漏问题)。

3.2 事件分发逻辑 (processSelectedKey)

java

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {k.interestOps(ops & ~SelectionKey.OP_CONNECT); // 移除 CONNECT 监听unsafe.finishConnect(); // 完成连接
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush(); // 处理写事件
}
if ((readyOps & (OP_READ | OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read(); // 处理读/接受连接事件
}
  • 优先级:先处理 OP_CONNECT > OP_WRITE > OP_READ/OP_ACCEPT

  • 特殊 case:处理 JDK Bug(readyOps=0 时仍可能触发读事件)。


4. 读事件处理 (NioByteUnsafe.read())

java

do {byteBuf = allocHandle.allocate(allocator);int bytesRead = doReadBytes(byteBuf);if (bytesRead <= 0) {if (bytesRead < 0) close = true; // EOF 关闭连接break;}pipeline.fireChannelRead(byteBuf); // 触发事件
} while (allocHandle.continueReading());
  • 自适应缓冲区:通过 RecvByteBufAllocator 动态调整缓冲区大小(如 AdaptiveRecvByteBufAllocator)。

  • 循环读取:持续读取直到无数据或达到上限(避免饿死其他 Channel)。

  • 异常处理

    • 读异常时释放缓冲区,触发 exceptionCaught

    • 若需关闭,根据配置决定半关闭或全关闭。


5. 任务调度与线程模型

5.1 任务提交 (execute())

java

public void execute(Runnable task) {execute(task, wakesUpForTask(task));
}
  • 唤醒控制:若任务需立即执行(非 LazyRunnable),调用 wakeup() 唤醒阻塞的 Selector。

5.2 线程生命周期 (doStartThread())
  • 单线程执行:通过 executor.execute() 启动唯一事件循环线程。

  • 优雅关闭

    1. 标记状态为 ST_SHUTTING_DOWN

    2. 执行剩余任务和关闭钩子。

    3. 拒绝新任务(状态置为 ST_SHUTDOWN)。

    4. 清理资源(如 FastThreadLocal),通知终止 Future。


总结:Netty Select/Poll 机制特点

  1. 高效事件循环

    • 融合 I/O 事件、任务调度、定时任务。

    • 按 ioRatio 平衡 I/O 与任务执行时间。

  2. Select 优化

    • 超时时间绑定定时任务。

    • 规避 JDK Selector 空轮询(重建机制)。

    • 优化 SelectedKeys 遍历(数组 + 显式置空)。

  3. 健壮性设计

    • 异常处理分离(I/O 异常 vs 循环异常)。

    • 连接/读写事件分层处理。

  4. 资源管理

    • 读缓冲区自适应分配。

    • Channel 生命周期绑定事件循环。

    • 优雅关闭确保任务不丢失。

此机制使 Netty 在高并发下保持低延迟,同时避免传统 NIO 的常见陷阱(如空轮询、资源泄漏)。 

##源码

@Overrideprotected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt = 0;}} catch (CancelledKeyException e) {// Harmless exception - log anywayif (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {if (logger.isErrorEnabled()) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +"be called before run() implementation terminates.");}}try {// Run all remaining tasks and shutdown hooks. At this point the event loop// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for// graceful shutdown with quietPeriod.for (;;) {if (confirmShutdown()) {break;}}// Now we want to make sure no more tasks can be added from this point. This is// achieved by switching the state. Any new tasks beyond this point will be rejected.for (;;) {int oldState = state;if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {break;}}// We have the final set of tasks in the queue now, no more can be added, run all remaining.// No need to loop here, this is the final pass.confirmShutdown();} finally {try {cleanup();} finally {// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify// the future. The user may block on the future and once it unblocks the JVM may terminate// and start unloading classes.// See https://github.com/netty/netty/issues/6596.FastThreadLocal.removeAll();STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();int numUserTasks = drainTasks();if (numUserTasks > 0 && logger.isWarnEnabled()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + numUserTasks + ')');}terminationFuture.setSuccess(null);}}}}});}private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop == this) {// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());}return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}protected class NioByteUnsafe extends AbstractNioUnsafe {private void closeOnRead(ChannelPipeline pipeline) {if (!isInputShutdown0()) {if (isAllowHalfClosure(config())) {shutdownInput();pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);} else {close(voidPromise());}} else {inputClosedSeenErrorOnRead = true;pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);}}private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,RecvByteBufAllocator.Handle allocHandle) {if (byteBuf != null) {if (byteBuf.isReadable()) {readPending = false;pipeline.fireChannelRead(byteBuf);} else {byteBuf.release();}}allocHandle.readComplete();pipeline.fireChannelReadComplete();pipeline.fireExceptionCaught(cause);if (close || cause instanceof IOException) {closeOnRead(pipeline);}}@Overridepublic final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}@Overridepublic void execute(Runnable task) {ObjectUtil.checkNotNull(task, "task");execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}  

http://www.lryc.cn/news/622668.html

相关文章:

  • Horse3D游戏引擎研发笔记(六):在QtOpenGL环境下,仿Unity的材质管理Shader绘制四边形
  • Nginx域名和IP兼容双方的API地址
  • JavaScript forEach() 与 for 循环 return 行为全解析
  • 1083. 数列极差问题
  • 2025暑期—10ROS系统实现-计算图
  • Linux sar命令详细使用指南
  • 【CV 目标检测】Fast RCNN模型①——与R-CNN区别
  • 【洛谷刷题】用C语言和C++做一些入门题,练习洛谷IDE模式:分支机构(一)
  • VUE+SPRINGBOOT从0-1打造前后端-前后台系统-用户管理
  • 基于Python的课程作业管理系统 Python+Django+Vue.js
  • .net印刷线路板进销存PCB材料ERP财务软件库存贸易生产企业管理系统
  • 《Python 单例模式(Singleton)深度解析:从实现技巧到争议与最佳实践》
  • pytest tmpdir fixture介绍(tmpdir_factory)(自动在测试开始前创建一个临时目录,并在测试结束后删除该目录)
  • C#单元测试(xUnit + Moq + coverlet.collector)
  • STM32 软件I2C读写MPU6050
  • 云服务平台主流架构的相关知识体系剖析
  • 完整设计 之 智能合约系统:主题约定、代理协议和智能合约 (临时命名)----PromptPilot (助手)答问之2
  • 智能合约:区块链时代的“数字契约革命”
  • C++ STL-string类底层实现
  • 《WebPages 数据库:构建高效网络信息管理平台的关键技术解析》
  • RK3568 NPU RKNN(四):RKNN-ToolKit2性能和内存评估
  • Vue3从入门到精通:5.2 Vue3构建工具与性能优化深度解析
  • 微软Wasm学习-创建一个最简单的c#WebAssembly测试工程
  • PHP域名授权系统网站源码_授权管理工单系统_精美UI_附教程
  • 【C 学习】06-算法程序设计举例
  • [1Prompt1Story] 注意力机制增强 IPCA | 去噪神经网络 UNet | U型架构分步去噪
  • 智慧景区导览系统:基于WebGL的手绘地图导览设计与应用,DeepSeek大模型赋能精准游客引导服务
  • OBOO鸥柏丨75寸/86平板企业办公会议触控一体机核心国产化品牌招投标参数
  • eChart饼环pie中间显示总数_2个以上0值不挤掉
  • VS Code配置MinGW64编译非线性优化库NLopt