当前位置: 首页 > news >正文

12-netty基础-手写rpc-编解码-04

 netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 自定义编辑码

编解码都采用原生的ByteBuf,分别为MessageToByteEncoder、ByteToMessageDecoder;解决了拆包、粘包问题
编码:将需要发送的数据封装成RpcProtocol形式进行发送
解码:将接收到的数据解释成RpcProtocol形式然后处理相应的业务逻辑

2 代码

2.1 编码

package com.bonnie.protocol.code;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.serializer.ISerializer;
import com.bonnie.protocol.serializer.SerializerManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;/*** 编码*/
@Slf4j
public class BonnieEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol<Object> msg, ByteBuf out) throws Exception {log.info("============begin BonnieEncoder=========");Header header = msg.getHeader();// 魔数out.writeShort(header.getMagic());// 序列化类型out.writeByte(header.getSerialType());// 消息类型out.writeByte(header.getReqType());// 请求idout.writeLong(header.getRequestId());// 消息体序列化ISerializer serializer = SerializerManager.getSerializer(header.getSerialType());byte[] contentByteArray = serializer.serialize(msg.getContent());System.out.println("body长度"+contentByteArray.length);// 消息体长度,4个字节out.writeInt(contentByteArray.length);System.out.println("发送数据:"+JSONObject.toJSONString(msg));// 写入消息体out.writeBytes(contentByteArray);}}

2.2 解码

package com.bonnie.protocol.code;import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.serializer.ISerializer;
import com.bonnie.protocol.serializer.SerializerManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.Objects;/*** 解码*/
@Slf4j
public class BonnieDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {log.info("========begin BonnieDecoder==========");// 首先判断可读的字节是否小于头的长度,如果小于,说明没有body数据,甚至数据有问题,不解码if (in.readableBytes()<= RpcConstant.HEAD_TOTOAL_LEN) {return;}// 标记读取开始索引in.markReaderIndex();// 魔数short magic = in.readShort();if (!Objects.equals(magic, RpcConstant.MAGIC)) {throw new IllegalArgumentException("Illegal request parameter 'magic',"+magic);}// 序列化类型byte serialType = in.readByte();// 消息类型byte reqType = in.readByte();// 请求idlong requestId = in.readLong();// 报文长度int dataLength = in.readInt();// 可读字节是否小于body的长度,如果小于,则不读取,并且重置到读指针的地方,等下一次读if(in.readableBytes()<dataLength) {in.resetReaderIndex();return;}// 消息体byte[] bodyByteArray = new byte[dataLength];// body内容读取到body中in.readBytes(bodyByteArray);// 封装头信息Header header = new Header();header.setMagic(magic);header.setSerialType(serialType);header.setReqType(reqType);header.setRequestId(requestId);header.setLength(dataLength);// 拿到对应的序列化ISerializer serializer = SerializerManager.getSerializer(serialType);/*** 根据请求类型,比如客户端发送数据,就是REQUWST,服务端给客户端回复数据就是RESPONSE,当然都是* 相对的,每一段都会发送REQUEST请求,每一段也会发送RESPONSE请求*/ReqTypeEnum reqTypeEnum = ReqTypeEnum.findByCode(reqType);switch (reqTypeEnum) {// 如果是请求报文  反序列化得到数据,封装数据,继续传递case REQUEST:RpcProtocol rpcProtocol = dealRequest(bodyByteArray, serializer, header);out.add(rpcProtocol);break;case RESPONSE:RpcProtocol rpcProtocolResponse = dealResponse(bodyByteArray, serializer, header);out.add(rpcProtocolResponse);break;case HEARTBEAT:// TODObreak;}}private RpcProtocol dealResponse(byte[] bodyByteArray, ISerializer serializer, Header header) {RpcResponse rpcResponse = serializer.deserialize(bodyByteArray, RpcResponse.class);RpcProtocol<RpcResponse> rpcProtocol = new RpcProtocol<>();rpcProtocol.setHeader(header);rpcProtocol.setContent(rpcResponse);return rpcProtocol;}private RpcProtocol dealRequest(byte[] bodyByteArray, ISerializer serializer, Header header) {RpcRequest rpcRequest = serializer.deserialize(bodyByteArray, RpcRequest.class);RpcProtocol<RpcRequest> rpcProtocol = new RpcProtocol<>();rpcProtocol.setHeader(header);rpcProtocol.setContent(rpcRequest);return rpcProtocol;}}

http://www.lryc.cn/news/612925.html

相关文章:

  • web前端结合Microsoft Office Online 在线预览,vue实现(PPT、Word、Excel、PDF等)
  • 表单元素与美化技巧:打造用户友好的交互体验
  • 【LVGL自学笔记暂存】
  • LINUX-批量文件管理及vim文件编辑器
  • VBA之Word应用第四章第一节:段落集合Paragraphs对象(一)
  • 11-netty基础-手写rpc-支持多序列化协议-03
  • 从零开始构建情绪可视化日记平台 - React + TypeScript + Vite
  • 芯谷科技--高效噪声降低解决方案压缩扩展器D5015
  • 30-Hive SQL-DML-Load加载数据
  • 微算法科技(NASDAQ:MLGO)利用集成学习方法,实现更低成本、更稳健的区块链虚拟货币交易价格预测
  • 51单片机
  • 数据推荐|标贝科技方言自然对话数据集 构建语音交互新基建
  • 全球化2.0 | 泰国IT服务商携手云轴科技ZStack重塑云租赁新生态
  • 最新教程 | CentOS 7 内网环境 Nginx + ECharts 页面离线部署手册(RPM 安装方式)
  • 前端开发(HTML,CSS,VUE,JS)从入门到精通!第七天(Vue)(二)
  • 如何为WordPress启用LiteSpeed缓存
  • HTML已死,HTML万岁——重新思考DOM的底层设计理念
  • 炫酷圆形按钮调色器
  • Ubuntu 系统 Docker 启动失败(iptables/nf\_tables)
  • 应急响应复现
  • Android 原生与 Flutter 通信完整实现 (Kotlin 版)
  • JPA 分页查询与条件分页查询
  • 《深入理解 WSGI:解锁 Python Web 应用背后的奥秘》
  • Java+Vue合力开发固定资产条码管理系统,移动端+后台管理,集成资产录入、条码打印、实时盘点等功能,助力高效管理,附全量源码
  • 前端性能优化:从请求到资源的精细调控
  • Event Stream输出优化:Vue3节流函数的正确实现
  • 【大前端】vite忽略指定前缀的静态资源
  • 【插件式微服务架构系统分享】之 解耦至上:gateway 网关与APISIX 网关的不同分工
  • 一文解读“Performance面板”前端性能优化工具基础用法!
  • SpringAI