当前位置: 首页 > news >正文

netty之处理连接源码分析

写在前面

在这篇文章看了netty服务是如何启动的,服务启动成功后,也就相当于是迎宾工作都已经准备好了,那么当客人来了怎么招待客人呢?也就是本文要看的处理连接的工作。

1:正文

先启动源码example模块的echoserver,后启动echoclient进行debug调试。

从哪里开始呢,是从EventLoop方法中开始,在EventLoop中有一个run方法(注意不是Thread的run方法),通过死循环的方式不断的轮询selector的事件,这里自然是轮询op_accept事件了,故事就从这里开始了,如下:

// io.netty.channel.nio.NioEventLoop#run
// 系循环方式处理事件,相当用户死循环selector.select
@Override
protected void run() {int selectCnt = 0;for (;;) {try {// ...if (ioRatio == 100) {// ...} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {// 处理select到的事件processSelectedKeys();} finally {// ...}} else {// ...}// ...}
}

processSelectedKeys如下:

// io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {// processSelectedKeysOptimized是netty的优化版本(使用了反射等),更少的gc,1%~2%的效率提升等if (selectedKeys != null) {processSelectedKeysOptimized();} else {// 这里直接使用Java NIO的方式来获取发生发的事件了(一般不走)processSelectedKeysPlain(selector.selectedKeys());}
}

这里执行processSelectedKeysOptimized:

// io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i]; // 事件对应的selection key// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment(); // 这里的attachment就是serversocketchannel了,在serversocketchannle绑定到selector时指定的if (a instanceof AbstractNioChannel) { // 这里自然是true了,因为我们使用的是Java NIO的方式processSelectedKey(k, (AbstractNioChannel) a);} else {// ...}// ...}
}

注意代码final Object a = k.attachment();获取到对应server socket channel,比较重要,接着看代码processSelectedKey(k, (AbstractNioChannel) a);:

// io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// ...try {// ...// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loop 接受连接的话会执行到这里if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}

这里的readyOps自然就是op_read,1了,unsafe.read();,执行到io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages:

// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel()); // 内部会accept连接生成socketchannel类 serverSocketChannel.accept();try {if (ch != null) {buf.add(new NioSocketChannel(this, ch)); // 存储到buf,后续会从buf中拿return 1; // 代表buf中元素的数量}} catch (Throwable t) {// ...}return 0;
}

SocketChannel ch = SocketUtils.accept(javaChannel());就accept了,接着执行代码:

// io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
public void read() {// ...try {// ...int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i)); // 通过pipeline开始处理连接请求,主要看ServerBootstrapAcceptor,其他的都是辅助的handler,如日志打印等}// ...} finally {// ...}
}

pipeline.fireChannelRead(readBuf.get(i));主要看ServerBootstrapAcceptor,其他的都是辅助的handler,如日志打印等:

// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {// ...try {// 完成注册,不管是serversocketchannel还是socketchannel,都是注册到selector中childGroup.register(child).addListener(new ChannelFutureListener() {// ...});} catch (Throwable t) {forceClose(child, t);}
}

childGroup.register:

// io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// ...// eventLoop.inEventLoop()判断是否为eventgrou线程,这里不是,如果是main启动的话,则此时是main threadlogger.info("当前的线程是: " + Thread.currentThread().getName());if (eventLoop.inEventLoop()) {register0(promise);} else { // 使用eventloop的线程执行注册到seletor的工作try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// ...}}
}

此处在这篇文章已经看过了,只不过这里是将socketchannel注册到selector而已。因为注册op事件比较重要,所以这里再来看下:

// io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {// ...// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// ...}}} catch (Throwable t) {// ...}
}

pipeline.fireChannelActive();最终执行到:

// io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;// 完成op事件的注册,对于serversocketchannel就是op_acceptfinal int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {logger.info("注册op事件:{}", (interestOps | readInterestOp));selectionKey.interestOps(interestOps | readInterestOp);}
}

自然这里注册的op_code就是1,op_read了。

2:流程总结

1:event loop死循环执行selectfor (;;) {}
2:监听到op_accept事件,acceept连接,创建socketchannelSocketChannel ch = SocketUtils.accept(javaChannel())
3:绑定op_read事件等待读取数据selectionKey.interestOps(interestOps | readInterestOp);

3:两个线程

其中一个是boss group对应的eventloop中的线程,另一个是worker group对应的event loop中的线程:

boss线程:轮询selector,accept连接,创建socket channel为读写客户端数据做准备初始化socket channel,从worker group中选择一个event loop
worker线程:绑定socketchannel到selector中注册socketchannel的op_read事件到selector中,准备读取数据

写在后面

参考文章列表

netty之是如何做好服务准备的。

http://www.lryc.cn/news/476540.html

相关文章:

  • Dockerfile文件编写
  • Oracle SQL 使用 ROWNUM 分页查询速度太慢的问题及解决方案!
  • Django3 + Vue.js 前后端分离书籍添加项目Web开发实战
  • 楼梯区域分割系统:Web效果惊艳
  • Day10加一
  • UTF-8简介
  • 基于Openwrt系统架构,实现应用与驱动的实例。
  • SQL进阶技巧:如何利用三次指数平滑模型预测商品零售额?
  • HTB:Cicada[WriteUP]
  • 小张求职记四
  • 适用于 c++ 的 wxWidgets框架源码编译SDK-windows篇
  • flink 内存配置(二):设置TaskManager内存
  • 【C++ 算法进阶】算法提升八
  • 阿里云实时数据仓库HologresFlink
  • 生成式语言模型的文本生成评价指标(从传统的基于统计到现在的基于语义)
  • 【网安案例学习】暴力破解攻击(Brute Force Attack)
  • 时间序列预测(十八)——实现配置管理和扩展命令行参数解析器
  • Vue问题汇总解决
  • Spark学习
  • 一些小细节代码笔记汇总
  • L4.【LeetCode笔记】链表题的VS平台调试代码
  • JavaCV 之高斯滤波:图像降噪与细节保留的魔法
  • VsCode显示空格
  • .Net C# 基于EFCore的DBFirst和CodeFirst
  • w012基于springboot的社区团购系统设计
  • 笔记本降频超鬼锁屏0.39电脑卡到不行解决办法实操记录
  • 优选算法第四讲:前缀和模块
  • ubuntu20.04 加固方案-设置限制su命令用户组
  • TDengine数据备份与恢复
  • 2024最新的开源博客系统:vue3.x+SpringBoot 3.x 前后端分离