当前位置: 首页 > news >正文

Netty常用核心类说明

MessageToByteEncoder

MessageToByteEncoder是一个抽象编码器,子类可重写encode方法把对象编码为ByteBuf输出。

MessageToByteEncoder继承自ChannelOutboundHandlerAdapter,encode在出站是被调用。

public class MyMessageEncoder extends MessageToByteEncoder<MessagePO> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessagePO msg, ByteBuf out) throws Exception {System.out.println("MyMessageEncoder.encode,被调用");String json = JSONObject.toJSONString(msg);out.writeInt(json.getBytes(StandardCharsets.UTF_8).length);out.writeBytes(json.getBytes(StandardCharsets.UTF_8));}
}

ByteToMessageDecoder

ByteToMessageDecoder是一种ChannelInboundHandler,可以称为解码器,负责将byte字节流(ByteBuf)转换成一种Message,Message是应用可以自己定义的一种Java对象。

ByteToMessageDecoder:用于将字节转为消息,需要检测缓冲区是否有足够的字节。

public class MyMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("MyMessageDecoder.decode,被调用");while (in.readableBytes() >= 4){int num = in.readInt();System.out.println("解码出一个整数:"+num);out.add(num);}}
}

ReplayingDecoder

ReplayingDecoder:继承自ByteToMessageDecoder,不需要检测缓冲区是否有足够的字节,但是ReplayingDecoder的速度略慢于ByteToMessageDecoder,而且并不是所有的ByteBuf都支持。

项目复杂度高用ReplayingDecoder,否则使用ByteToMessageDecoder。

public class MyMessageDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("MyMessageDecoder.decode,被调用");int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);String json = new String(content,StandardCharsets.UTF_8);MessagePO po = JSONObject.parseObject(json,MessagePO.class);out.add(po);}
}

MessageToMessageEncoder

用于从一种消息编码为另外一种消息,例如从POJO到POJO,是一种ChannelOutboundHandler

MessageToMessageDecoder

从一种消息解码为另一种消息,例如POJO到POJO,是一种ChannelInboundHandler

MessageToMessageCodec

整合了MessageToMessageEncoder 和 MessageToMessageDecoder

public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> {@Overrideprotected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception {System.out.println("RequestMessageCodec.encode 被调用 " + msg);String json = JSONObject.toJSONString(msg);out.add(json);}@Overrideprotected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {System.out.println("RequestMessageCodec.decode 被调用 " + msg);RequestData po = JSONObject.parseObject(msg, RequestData.class);out.add(po);}
}

ChannelInitializer

ChannelInitializer是一种特殊的ChannelInboundHandler,可以通过一种简单的方式(调用initChannel方法)来初始化Channel。

通常在Bootstrap.handler(ChannelHandler)ServerBootstrap.handler(ChannelHandler)ServerBootstrap.childHandler(ChannelHandler)中给Channel设置ChannelPipeline。

注意:当initChannel被执行完后,会将当前的handler从Pipeline中移除。

Bootstrap bootstrap = new Bootstrap().group(group)//设置线程组.channel(NioSocketChannel.class)//设置客户端通道的实现类.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler());//加入自己的处理器}});
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接的个数.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
//      .handler(null)//该Handler对应bossGroup.childHandler(new ChannelInitializer<SocketChannel>() {//给workerGroup的EventLoop对应的管道设置处理器@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyServerHandler());}});

SimpleChannelInboundHandler

SimpleChannelInboundHandler继承自ChannelInboundHandlerAdapter,可以通过泛型来规定消息类型。
处理入站的数据我们只需要实现channelRead0方法。
SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuffer资源,ChannelInboundHandlerAdapter不会自动释放。

public class MyClientHandler extends SimpleChannelInboundHandler<MessagePO> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessagePO msg) throws Exception {System.out.println("收到服务端消息:" + msg);}
}

DefaultEventLoopGroup

在向pipline中添加ChannelHandler时,可以提供一个新的线程组,Handler业务会在该线程中执行。
当加ChannelHandler需要执行多线程并发业务时,DefaultEventLoopGroup可以派上大用处。
如果没有设置DefaultEventLoopGroup,默认使用的是EventLoopGroup workerGroup = new NioEventLoopGroup();

DefaultEventLoopGroup businessGroup = new DefaultEventLoopGroup(100);
...
addLast(businessGroup, new MyNettyServerHandler())
/*** 读取客户端发送过来的消息*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println("收到客户信息:" + byteBuf.toString(CharsetUtil.UTF_8));System.out.println("客户端地址:" + ctx.channel().remoteAddress());System.out.println("处理线程:" + Thread.currentThread().getName());ctx.executor().parent().execute(()->{try {System.out.println("parent().execute Thread = " + Thread.currentThread().getName());TimeUnit.SECONDS.sleep(2L);System.out.println("parent任务执行完成1");} catch (InterruptedException e) {e.printStackTrace();}});ctx.executor().parent().execute(()->{try {System.out.println("parent().execute Thread = " + Thread.currentThread().getName());TimeUnit.SECONDS.sleep(2L);System.out.println("parent任务执行完成2");} catch (InterruptedException e) {e.printStackTrace();}});ctx.executor().parent().execute(()->{try {System.out.println("parent().execute Thread = " + Thread.currentThread().getName());TimeUnit.SECONDS.sleep(2L);System.out.println("parent任务执行完成3");} catch (InterruptedException e) {e.printStackTrace();}});
}

以上代码执行日志如下:

收到客户信息:Hello 服务端
客户端地址:/127.0.0.1:60345
处理线程:defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-2
parent().execute Thread = defaultEventLoopGroup-4-3
程序继续~~ defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-4
parent任务执行完成1
parent任务执行完成3
parent任务执行完成2

EventLoop定时任务

可以在Handler中通过方法ctx.channel().eventLoop().schedule()添加定时任务

ctx.channel().eventLoop().schedule(()->{try {System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());TimeUnit.SECONDS.sleep(2L);System.out.println("定时任务执行完成");} catch (InterruptedException e) {e.printStackTrace();}
},10L,TimeUnit.SECONDS);
http://www.lryc.cn/news/17484.html

相关文章:

  • ingress服务
  • java 抽象类 详解
  • MySQL的安装(详解)
  • 界面控件DevExpress WinForm——轻松构建类Visual Studio UI(二)
  • BabylonJS之放烟花
  • vue3 布局样式的原理
  • Qt程序使用路径方式和注意事项
  • 和日期相关的代码和bug——一道力扣题中的小发现
  • 如何在2023年学习编程并获得开发者工作
  • Python实战之小说下载神器(三)排行榜所有小说:最全热门小说合集,总有一款适合你,好多好多好多超赞的小说...(源码分享学习)
  • 前端监控之用户行为监控实践1(数据收集)
  • 【网络原理7】认识HTTP
  • SPI实验
  • 去基线处理
  • 模拟信号4-20mA /0-5V/0-75mV/0-100mV转RS-485/232,数据采集A/D转换模块 YL21
  • [USB]键盘数据格式以及按键键值
  • web客户端-websocket
  • mysql间隙锁
  • 华为OD机试 - 计算面积(Java) | 机试题+算法思路+考点+代码解析 【2023】
  • Python 之 Pandas 时间戳、通过时间间隔实现 datetime 加减、时间转化、时期频率转换和 shift() 时间频率进行移位)
  • 一篇文章搞定linux网络模型
  • 惠普庆祝在中国40年,强化中国发展战略
  • C++小作业
  • Python基础 — lambda匿名函数
  • MongoDB安装和使用过程常见问题
  • AWS攻略——使用中转网关(Transit Gateway)连接同区域(Region)VPC
  • Rouge | 自动文摘及机器翻译评价指标
  • 【Python入门第十五天】Python字典
  • java学习思路
  • MySQL操作数据库-------创建数据库