当前位置: 首页 > news >正文

07-netty基础-自定义编解码器

1 自定义协议格式

自定义协议中包含  Length|HeaderLength|HeaderData|Body,其中headerData中会包含很多请求头中的信息版本、协议、序列化方式、请求类型;
Length、HeaderLength占用4个字节

2 代码目录结构说明

分为三个子模块分别是:
netty-message-common:公共使用的类,主要包含协议相关类,自定义编解码
netty-message-client: netty客户端,负责连接服务器,并发送数据到服务端
netty-message-server:netty服务端,负责启动服务,接收客户端请求

netty-message-common:
        协议:BonnieHeader、BonnieHeaderData、BonnieMessageRecord
        编解码:BonnieEncoder、BonnieDecoder
         相关枚举:LanguageCodeEnum、ReqTypeEnum、SerializableTypeEnum

netty-message-client:
        NettyClient、ClientHandler
netty-message-server:
        NettyServer、ServerHandler


3 代码git地址

下载地址:https://gitee.com/huyanqiu6666/netty  分支:20250729-custom-message-protocol-01

4 netty-message-common

4.1 定义协议

package com.bonnie.netty.common.entity;import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;import java.io.Serializable;@Slf4j
@Data
@ToString
public class BonnieHeader implements Serializable {// header长度  4个字节private Integer headerLength;// 头内容private BonnieHeaderData headerData;
}
package com.bonnie.netty.common.entity;import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.io.Serializable;@Slf4j
@Data
public class BonnieHeaderData implements Serializable {// 版本private Integer version;// 语言private byte languageCode;// 序列化方式  1个字节private byte serializableType;// 请求类型   1个字节private byte reqType;
}
package com.bonnie.netty.common.entity;import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;import java.io.Serializable;@Slf4j
@Data
@ToString
public class BonnieMessageRecord implements Serializable {// 长度  4个字节private Integer length;// 请求头信息private BonnieHeader header;// 消息体private Object body;
}

4.2 netty-message-server

package com.bonnie.server.netty;import com.bonnie.netty.common.coder.BonnieDecoder;
import com.bonnie.netty.common.coder.BonnieEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new BonnieEncoder()).addLast(new BonnieDecoder()).addLast(new ServerHandler());}});try {ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}}
package com.bonnie.server.netty;import com.bonnie.netty.common.domain.User;
import com.bonnie.netty.common.entity.BonnieHeader;
import com.bonnie.netty.common.entity.BonnieHeaderData;
import com.bonnie.netty.common.entity.BonnieMessageRecord;
import com.bonnie.netty.common.enums.LanguageCodeEnum;
import com.bonnie.netty.common.enums.ReqTypeEnum;
import com.bonnie.netty.common.enums.SerializableTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {BonnieMessageRecord record = (BonnieMessageRecord) msg;System.out.println("收到客户端消息:"+record);// 给客户端返回数据BonnieMessageRecord messageRecord = new BonnieMessageRecord();BonnieHeader header = new BonnieHeader();BonnieHeaderData headerData = new BonnieHeaderData();User user = (User) record.getBody();Integer age = user.getAge();headerData.setVersion(2);headerData.setLanguageCode(LanguageCodeEnum.JAVA.getCode());headerData.setSerializableType(SerializableTypeEnum.JAVA.getCode());headerData.setReqType(ReqTypeEnum.RES.getCode());header.setHeaderData(headerData);messageRecord.setBody("客户端消息接收成功,用户的年龄是"+age);ctx.writeAndFlush(record);super.channelRead(ctx, msg);}
}

4.3 netty-message-client

package com.bonnie.client.netty;import com.bonnie.netty.common.coder.BonnieDecoder;
import com.bonnie.netty.common.coder.BonnieEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;import java.net.InetSocketAddress;public class NettyClient {public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new BonnieEncoder()).addLast(new BonnieDecoder()).addLast(new ClientHandler());}});try {ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {worker.shutdownGracefully();}}}
package com.bonnie.client.netty;import com.bonnie.netty.common.domain.User;
import com.bonnie.netty.common.entity.BonnieHeader;
import com.bonnie.netty.common.entity.BonnieHeaderData;
import com.bonnie.netty.common.entity.BonnieMessageRecord;
import com.bonnie.netty.common.enums.LanguageCodeEnum;
import com.bonnie.netty.common.enums.ReqTypeEnum;
import com.bonnie.netty.common.enums.SerializableTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for (int i=0; i<10; i++) {// 构建报文内容 length | headerLength | headerData | bodyDataBonnieMessageRecord messageRecord = new BonnieMessageRecord();// 请求头信息BonnieHeader header = new BonnieHeader();BonnieHeaderData bonnieHeaderData = new BonnieHeaderData();bonnieHeaderData.setVersion(0);bonnieHeaderData.setLanguageCode(LanguageCodeEnum.JAVA.getCode());bonnieHeaderData.setReqType(ReqTypeEnum.REQ.getCode());bonnieHeaderData.setSerializableType(SerializableTypeEnum.JAVA.getCode());// 请求内容User user = new User();user.setName("我是请求数据:"+i);user.setAge(i);//            header.setHeaderLength();header.setHeaderData(bonnieHeaderData);//            messageRecord.setLength();messageRecord.setHeader(header);messageRecord.setBody(user);//            System.out.println("客户端发送数据:"+messageRecord);ctx.writeAndFlush(messageRecord);}super.channelActive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 将数据写入到服务端后,服务端回写数据到客户端,在这个方法中去接收数据BonnieMessageRecord record = (BonnieMessageRecord) msg;System.out.println("接收服务端消息:" + record);super.channelRead(ctx, msg);}}

4.4 编解码代码

编码
 

package com.bonnie.netty.common.coder;import com.bonnie.netty.common.entity.BonnieMessageRecord;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;/*** 编码器: 发送的信息构建成BonnieMessageRecord, 补充length以及headerLength*/
public class BonnieEncoder extends MessageToByteEncoder<BonnieMessageRecord> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, BonnieMessageRecord messageRecord, ByteBuf out) throws Exception {// 对头部数据进行序列化,并设置一些头部数据的值ByteBuffer byteBuffer = encoderHeader(messageRecord);out.writeBytes(byteBuffer);// 序列化body数据byte[] bodyByteArray = serializable(messageRecord.getBody());if (Objects.nonNull(bodyByteArray)) {out.writeBytes(bodyByteArray);}}private ByteBuffer encoderHeader(BonnieMessageRecord messageRecord) {// 构建报文内容 length | headerLength(4个字节) | headerData | bodyData// 头部length长度int length = 4;// headerLengthbyte[] headerByteArray = serializable(messageRecord.getHeader());int headerLength = headerByteArray.length;// bodyLengthint bodyLength = serializable(messageRecord.getBody()).length;// 所以定义一个length + headerLength +headerData 这些数据的bufferByteBuffer result = ByteBuffer.allocate(length + headerLength + bodyLength);// 设置 headerLength + headerData + bodyLength的长度result.putInt(length + headerLength + bodyLength);// 设置headerData的长度result.putInt(headerLength);// 设置headerDataresult.put(headerByteArray);// 切换成读模式result.flip();return result;}// 序列化private <T> byte[] serializable(T Object) {try {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(Object);return bos.toByteArray();} catch (IOException e) {e.printStackTrace();}return null;}}

解码:

package com.bonnie.netty.common.coder;import com.bonnie.netty.common.domain.User;
import com.bonnie.netty.common.entity.BonnieHeader;
import com.bonnie.netty.common.entity.BonnieMessageRecord;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.nio.ByteBuffer;
import java.util.Objects;/*** 解码:长度域解码器,将数据还原成对象BonnieMessageRecord的形式*/
@Slf4j
public class BonnieDecoder extends LengthFieldBasedFrameDecoder {public BonnieDecoder() {super(Integer.MAX_VALUE, 0, 4, 0, 4);}@Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {// 调用父类的decode方法,主要是根据规则获取到我们真正需要的数据封装成byteBufByteBuf byteBuf = (ByteBuf) super.decode(ctx, in);if (Objects.isNull(byteBuf)) {return null;}// 开始解码ByteBuffer byteBuffer = byteBuf.nioBuffer();return decoder(byteBuffer);}/*** 解码* @param byteBuffer* @return*/private BonnieMessageRecord decoder(ByteBuffer byteBuffer) {// 获取byteBuffer的长度int length = byteBuffer.limit();// 头长度int headerLength = byteBuffer.getInt();// 头字节数组byte[] headerDataByte = new byte[headerLength];byteBuffer.get(headerDataByte);BonnieMessageRecord messageRecord = new BonnieMessageRecord();messageRecord.setLength(length);BonnieHeader header = deserializable(BonnieHeader.class, headerDataByte);header.setHeaderLength(headerLength);// 反序列化body首先需要知道body的长度int bodyLength = length - 4 - headerLength;byte[] bodyData = null;if (bodyLength > 0) {bodyData = new byte[bodyLength];// 从byteBuffer中读取数据到bodyData里面byteBuffer.get(bodyData);}User userBody = deserializable(User.class, bodyData);messageRecord.setHeader(header);messageRecord.setBody(userBody);return messageRecord;}/*** 反序列化* @param clazz* @param data* @return*/private <T> T deserializable(Class<T> clazz, byte[] data) {ObjectInputStream ois = null;try {ois = new ObjectInputStream(new ByteArrayInputStream(data));return (T)ois.readObject();} catch (IOException | ClassNotFoundException e) {e.printStackTrace();}return null;}
}

http://www.lryc.cn/news/606887.html

相关文章:

  • Linux信号捕捉与穿插中断
  • linux中posix消息队列的使用记录
  • 鸿蒙系统下的动态负载均衡实战:让分布式任务调度更智能
  • 等保2.0指南:从系统等级划分到测评全流程攻略
  • 【PyTorch✨】01 初识PyTorch
  • 算法提升之数学(唯一分解定理)
  • 【unity小技巧】封装unity适合2D3D进行鼠标射线检测,获取鼠标位置信息检测工具类
  • Linux通用SPI作为Master——回环测试
  • 多屏混合KVM Dock扩展坞 如何打造极致高效生产力
  • 9.1无法恢复的错误与 panic!
  • Codeforces Round 1040 (Div. 2) A - D题详细题解
  • 第13届蓝桥杯Python青少组中/高级组选拔赛(STEMA)2021年10月24日真题
  • 项目上传到github中
  • Web3.0如何塑造互联网的未来
  • Spring AI MCP:解锁大模型应用开发新姿势
  • GitLab Docker Compose 迁移后 Redis 权限问题排查与解决
  • Linux中Docker Swarm介绍和使用
  • 深度学习-梯度爆炸与梯度消失
  • 宝塔服务器挂载数据盘
  • Hive SQL (HQL) 编辑指南
  • Jupyter Notebook 使用指南
  • 深度解析:Nginx的卓越性能
  • Java 24 新特性解析与代码示例
  • 理想I8对撞乘龙卡车,AI基于数学和物理的角度如何看?
  • macOS卸载.net core 8.0
  • 基于OpenCV的cv2.solvePnP方法实现头部姿态估计
  • STM32-ESP8266Wi-Fi模块使用USART实现通信/创建AP和STA模式配置教程(寄存器版)
  • 预测性维护之温振传感器选型与应用秘籍
  • ubuntu22.04系统入门 linux入门(二) 简单命令 多实践以及相关文件管理命令
  • Node.js的用途和安装方法