当前位置: 首页 > news >正文

【Netty】Netty中的超时处理与心跳机制(十九)

文章目录

  • 前言
  • 一、超时监测
  • 二、IdleStateHandler类
  • 三、ReadTimeoutHandler类
  • 四、WriteTimeoutHandler类
  • 五、实现心跳机制
    • 5.1. 定义心跳处理器
    • 5.2. 定义 ChannelInitializer
    • 5.3. 编写服务器
    • 5.4. 测试
  • 结语

前言

回顾Netty系列文章:

  • Netty 概述(一)
  • Netty 架构设计(二)
  • Netty Channel 概述(三)
  • Netty ChannelHandler(四)
  • ChannelPipeline源码分析(五)
  • 字节缓冲区 ByteBuf (六)(上)
  • 字节缓冲区 ByteBuf(七)(下)
  • Netty 如何实现零拷贝(八)
  • Netty 程序引导类(九)
  • Reactor 模型(十)
  • 工作原理详解(十一)
  • Netty 解码器(十二)
  • Netty 编码器(十三)
  • Netty 编解码器(十四)
  • 自定义解码器、编码器、编解码器(十五)
  • Future 源码分析(十六)
  • Promise 源码分析(十七)
  • 一行简单的writeAndFlush都做了哪些事(十八)

一、超时监测

Netty 的超时类型 IdleState 主要分为以下3类:

  • ALL_IDLE : 一段时间内没有数据接收或者发送。
  • READER_IDLE : 一段时间内没有数据接收。
  • WRITER_IDLE : 一段时间内没有数据发送。

针对上面的 3 类超时异常,Netty 提供了 3 类ChannelHandler来进行监测。

  • IdleStateHandler : 当 Channel 一段时间未执行读取、写入或者两者都未执行时,触发 -IdleStateEvent 事件。
  • ReadTimeoutHandler :在一定时间内未读取任何数据时,引发 ReadTimeoutEvent 事件。
  • WriteTimeoutHandler :当写操作在一定时间内无法完成时,引发 WriteTimeoutEvent 事件。

二、IdleStateHandler类

IdleStateHandler 包括了读\写超时状态处理,观察以下 IdleStateHandler 类的构造函数源码。

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {this.writeListener = new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {IdleStateHandler.this.lastWriteTime = IdleStateHandler.this.ticksInNanos();IdleStateHandler.this.firstWriterIdleEvent = IdleStateHandler.this.firstAllIdleEvent = true;}};this.firstReaderIdleEvent = true;this.firstWriterIdleEvent = true;this.firstAllIdleEvent = true;ObjectUtil.checkNotNull(unit, "unit");this.observeOutput = observeOutput;if (readerIdleTime <= 0L) {this.readerIdleTimeNanos = 0L;} else {this.readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);}if (writerIdleTime <= 0L) {this.writerIdleTimeNanos = 0L;} else {this.writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);}if (allIdleTime <= 0L) {this.allIdleTimeNanos = 0L;} else {this.allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);}}

在上述源码中,构造函数可以接收以下参数:

  • readerIdleTimeSecond:指定读超时时间,指定 0 表明为禁用。

  • writerIdleTimeSecond:指定写超时时间,指定 0 表明为禁用。

  • allIdleTimeSecond:在指定读写超时时间,指定 0 表明为禁用。

IdleStateHandler 使用示例:

public class MyChannelInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast("idleStateHandler",new IdleStateHandler(60,30,0));channel.pipeline().addLast("myHandler",new MyHandler());}
}public class MyHandler extends ChannelDuplexHandler {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if(evt instanceof IdleStateEvent){IdleStateEvent e = (IdleStateEvent) evt;if(e.state() == IdleState.READER_IDLE){ctx.close();}else if(e.state() == IdleState.WRITER_IDLE){ctx.writeAndFlush(new PingMessage());}}}
}

在上述示例中,IdleStateHandler 设置了读超时时间为 60 秒,写超时时间为 30 秒。MyHandler 是针对超时事件 IdleStateEvent 的处理。

  • 如果 30 秒内没有出站流量(写超时)时发送 ping 消息的示例。
  • 如果 60 秒内没有入站流量(读超时)时,连接关闭。

三、ReadTimeoutHandler类

ReadTimeoutHandler 类包括了读超时状态处理。ReadTimeoutHandler 类的源码如下:

public class ReadTimeoutHandler extends IdleStateHandler {private boolean closed;public ReadTimeoutHandler(int timeoutSeconds) {this((long)timeoutSeconds, TimeUnit.SECONDS);}public ReadTimeoutHandler(long timeout, TimeUnit unit) {super(timeout, 0L, 0L, unit);//禁用了写超时、读写超时}protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {assert evt.state() == IdleState.READER_IDLE;//只处理读超时this.readTimedOut(ctx);}protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {if (!this.closed) {ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);//引发异常ctx.close();this.closed = true;}}
}

从上述源码可以看出,ReadTimeoutHandler 继承自 IdleStateHandler,并在构造函数中禁用了写超时、读写超时,而且在处理超时时,只会针对 READER_IDLE状态进行处理,并引发 ReadTimeoutException 异常。
ReadTimeoutHandler 的使用示例如下:

public class MyChannelInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(30));channel.pipeline().addLast("myHandler",new MyHandler());}
}//处理器处理ReadTimeoutException 
public class MyHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof ReadTimeoutException){//...}else {super.exceptionCaught(ctx,cause);}}
}

在上述示例中,ReadTimeoutHandler 设置了读超时时间是 30 秒。

四、WriteTimeoutHandler类

WriteTimeoutHandler 类包括了写超时状态处理。WriteTimeoutHandler 类的源码如下:

public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {private static final long MIN_TIMEOUT_NANOS;private final long timeoutNanos;private WriteTimeoutHandler.WriteTimeoutTask lastTask;private boolean closed;public WriteTimeoutHandler(int timeoutSeconds) {this((long)timeoutSeconds, TimeUnit.SECONDS);}public WriteTimeoutHandler(long timeout, TimeUnit unit) {ObjectUtil.checkNotNull(unit, "unit");if (timeout <= 0L) {this.timeoutNanos = 0L;} else {this.timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);}}public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if (this.timeoutNanos > 0L) {promise = promise.unvoid();this.scheduleTimeout(ctx, promise);}ctx.write(msg, promise);}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {WriteTimeoutHandler.WriteTimeoutTask task = this.lastTask;WriteTimeoutHandler.WriteTimeoutTask prev;for(this.lastTask = null; task != null; task = prev) {task.scheduledFuture.cancel(false);prev = task.prev;task.prev = null;task.next = null;}}private void scheduleTimeout(ChannelHandlerContext ctx, ChannelPromise promise) {WriteTimeoutHandler.WriteTimeoutTask task = new WriteTimeoutHandler.WriteTimeoutTask(ctx, promise);task.scheduledFuture = ctx.executor().schedule(task, this.timeoutNanos, TimeUnit.NANOSECONDS);if (!task.scheduledFuture.isDone()) {this.addWriteTimeoutTask(task);promise.addListener(task);}}private void addWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {if (this.lastTask != null) {this.lastTask.next = task;task.prev = this.lastTask;}this.lastTask = task;}private void removeWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {if (task == this.lastTask) {assert task.next == null;this.lastTask = this.lastTask.prev;if (this.lastTask != null) {this.lastTask.next = null;}} else {if (task.prev == null && task.next == null) {return;}if (task.prev == null) {task.next.prev = null;} else {task.prev.next = task.next;task.next.prev = task.prev;}}task.prev = null;task.next = null;}protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {if (!this.closed) {ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);ctx.close();this.closed = true;}}//...
}

从上述源码可以看出,WriteTimeoutHandler 在处理超时时,引发了 WriteTimeoutException 异常。
WriteTimeoutHandler 的使用示例如下:

public class MyChannelInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast("writeTimeoutHandler",new WriteTimeoutHandler(30));channel.pipeline().addLast("myHandler",new MyHandler());}
}//处理器处理ReadTimeoutException 
public class MyHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof WriteTimeoutException ){//...}else {super.exceptionCaught(ctx,cause);}}
}

在上述示例中,WriteTimeoutHandler 设置了写超时时间是 30 秒。

五、实现心跳机制

针对超时的解决方案——心跳机制。
在程序开发中,心跳机制是非常常见的。其原理是,当连接闲置时可以发送一个心跳来维持连接。一般而言,心跳就是一段小的通信。

5.1. 定义心跳处理器

public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {// (1)心跳内容private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",CharsetUtil.UTF_8));  @Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {// (2)判断超时类型if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;String type = "";if (event.state() == IdleState.READER_IDLE) {type = "read idle";} else if (event.state() == IdleState.WRITER_IDLE) {type = "write idle";} else if (event.state() == IdleState.ALL_IDLE) {type = "all idle";}// (3)发送心跳ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);System.out.println( ctx.channel().remoteAddress()+"超时类型:" + type);} else {super.userEventTriggered(ctx, evt);}}
}

对上述代码说明:

  • 定义了心跳时,要发送的内容。

  • 判断是不是 IdleStateEvent 事件,是则处理。

  • 将心跳内容发送给客户端。

5.2. 定义 ChannelInitializer

HeartbeatHandlerInitializer用于封装各类ChannelHandler,代码如下:

public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {private static final int READ_IDEL_TIME_OUT = 4; // 读超时private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // (1)pipeline.addLast(new HeartbeatServerHandler()); // (2)}
}

对上述代码说明如下:

  • 添加了一个IdleStateHandler到 ChannelPipeline,并分别设置了读、写超时的时间。为了方便演示,将超时时间设置的比较短。
  • 添加了HeartbeatServerHandler,用来处理超时时,发送心跳。

5.3. 编写服务器

服务器代码比较简单,启动后侦听 8083 端口。

public final class HeartbeatServer {static final int PORT = 8083;public static void main(String[] args) throws Exception {// 配置服务器EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HeartbeatHandlerInitializer());// 启动ChannelFuture f = b.bind(PORT).sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

5.4. 测试

首先启动 HeartbeatServer,客户端用操作系统自带的 Telnet 程序即可:

telnet 127.0.0.1 8083

可以看到客户端与服务器的交互效果如下图。
在这里插入图片描述

结语

文章如果对你有帮助,看完记得点赞、关注、收藏。

http://www.lryc.cn/news/91636.html

相关文章:

  • 尚硅谷大数据hadoop教程_mapReduce
  • 一键启停脚本
  • 20230604_Hadoop命令操作练习
  • hashCode 与 equals(重要)?
  • 华为OD机试(2023.5新题) 需要打开多少监控器(java,py,c++,js)
  • 209.长度最小的子数组
  • react antd Modal里Form设置值不起作用
  • idea连接Linux服务器
  • 在windows环境下使用winsw将jar包注册为服务(实现开机自启和配置日志输出模式)
  • 汽车通用款一键启动舒适进入拓展蓝牙4G网络手机控车系统
  • QSettings Class
  • 【vue】关于vue中的插槽
  • Springboot整合Mybatis Plus【超详细】
  • 接口测试-使用mock生产随机数据
  • Kohl‘s百货的EDI需求详解
  • 二叉树part6 | ● 654.最大二叉树 ● 617.合并二叉树 ● 700.二叉搜索树中的搜索 ● 98.验证二叉搜索树
  • Linux命令记录
  • eBPF 入门实践教程十五:使用 USDT 捕获用户态 Java GC 事件耗时
  • Linux :: vim 编辑器的初次体验:三种 vim 常用模式 及 使用:打开编辑、退出保存关闭vim
  • Linux内核进程创建流程
  • 【03.04】大数据教程--HTTP协议和静态Web服务器
  • 数据共享传输:台式机和笔记本同步文件!
  • java设计模式(十二)代理模式
  • Umi微前端水印踩坑以及解决方案
  • Android RK3588-12 hdmi-in Camera方式支持NV24格式
  • Hive窗口函数详细介绍
  • 牛客网【c语言练习】
  • C++类和对象(上)
  • JavaScript 数据透视表 DHTMLX Pivot Crack
  • QT链接库设置