当前位置: 首页 > news >正文

springboot框架使用Netty依赖中解码器的作用及实现详解

在项目开发 有需求 需要跟硬件通信
也没有mqtt 作为桥接 也不能http 请求 api 所以也不能 json字符串这么爽传输

所以要用tcp 请求 进行数据交互 数据还是16进制的 写法 有帧头 什么的
对于这种物联网的这种对接
我的理解就是 我们做的工作就像翻译
把这些看不懂的 字节流 变成 java 认识的数据

就了解了一下 netty 这个依赖
在springboot 项目种使用这个依赖进行 连接

但是这个依赖 如果不熟悉 使用不当 有可能有内存溢出的风险
所以大家如果自己用这个netty 依赖 做物联网的调用
要小心 最好能找一下 物联网中间件 基于netty封装的

这是我后续 找的一个中间件 也不算很成熟 如果大家有更好用的 欢迎留言 一起分享

http://www.iteaj.com/#/course

但是我发现还是要根据 对接的物联网硬件文档 写自定义解码器 或者编码器
这篇文章主要分享一下 我了解到的 netty 下的 四个解码器 使用方案

概念:
拆包和沾包
是典型的拆包和沾包问题,俗话说就是两端通信,一端发送一端接收,接收的那一端怎么知道是否已经完整的接收了数据?

假设服务端连续发送了两条消息:hello world! / hello client!

由于客户端不知道怎么才算一条消息,怎么才算两条消息,所以读取会有以下几种情况:

1.分两次读取消息,第一次是hello world!,第二次是hello client! 这是正常情况

2.一次就读取完成,hello world!hello client! 这种情况就叫沾包

3.分两次读取消息,第一次是hello ,第二次是world!hello client! 这第一次读取就是拆包,第二次就是沾包

总之就是读取到的信息不完整就是拆包,读取到的信息有额外多的信息就是沾包

解码器就可以来解决这个问题

依赖引入就不说了
先分享代码

netty配置类

package com.netty.server.tpcServer;import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;/*** User:Json* Date: 2024/4/12**/
@Component
@Slf4j
public class NettyServer {private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);// 服务端NIO线程组private final EventLoopGroup bossGroup = new NioEventLoopGroup();private final EventLoopGroup workGroup = new NioEventLoopGroup();public ChannelFuture start(String host, int port) {ChannelFuture channelFuture = null;try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 1024 就是规定传入的 字节长度的大小// 对于字节 一个字符串占多少字节 int 占多多少字节 long 占多多少字节 如果明白这个 应该很好理解//第一种:LineBasedFrameDecoder:传入的参数是消息最大长度,发送消息的大小必须小于设置值// 行分隔符解码器(结尾根据 “\n” 作为结束标识)//socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));//第二种:DelimiterBasedFrameDecoder 自定义分割器解码器,结尾根据什么作为结束标识可以自定义 比如用 |// socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("|".getBytes())));// 第三种: FixedLengthFrameDecoder // 固定长度解码器,发送的消息需要定长//  socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(7));// 上面三个解码器明显的看到不够灵活  所以还有第四种//第四种:LengthFieldBasedFrameDecoder:基于长度的自定义解码器,比较灵活 这个// 这个 解码器 一共有 5个参数
//                            maxFrameLength:最大帧长度。也就是可以接收的数据的最大长度。如果超过,此次数据会被丢弃
//                            lengthFieldOffset:长度域偏移量。存储数据长度的一个偏移量
//                            lengthFieldLength:长度域字节数。存储数据长度的一个大小
//                            lengthAdjustment:数据长度修正。因为长度既可以代表data的长度,也可以是整个消息的长度
//                            initialBytesToStrip:跳过的字节数。可以选择舍弃一部分数据// 可以理解为 我们给数据定义传输规则 就像http请求 定义规则 比如 请求头 数据结构 json 等等// 所以我们作为接收数据的服务端 可以在开发接收数据的时候 定义规则 然后让客户端按照规则来传输数据// 比如 发送一个消息,我们定义的结构为:消息头+数据长度+数据// 我们怎么定义规则呢 比如:// 整体字节为 15个字节// 要发送的数据 "Message" 长度为 7个字节//我们规定 maxFrameLength :1024 接收的最大数据,// lengthFieldOffset:长度域偏移量 一般用来定义消息头 4个字节、// lengthFieldLength:长度域字节数  用于定义数据长度(length)的【值】的大小 4个字节// lengthAdjustment : 0 数据长度修正// 假设我设置的数据长度是20,代表了整个消息体的长度,但是我数据却只有12个字节,这往后读20个字节无疑是错的,所以我们需要修正,怎么修正? 减8 就行// 所以如果你需要修正你的 数据长度,那么lengthAdjustment就是用来修正的。// initialBytesToStrip :8 ,如果在我们业务层只需要 消息体 像消息头 和 数据长度都不需要 那么 initialBytesToStrip就是用来跳过的字节数。// 就比如 消息头规定 4个字节 数据长度 4个字节 所以我们如果只要 消息体里的内容 就需要跳过 8个字节 所以设置8 即可// 获取全部数据//  socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,4,4,0,0));// 只获取消息体里的数据socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,4,4,0,8));// 自定义服务处理 所以再进行服务处理之前 需要先经历 解码器   既然有解码器 也有编码器// 如果我们作为服务端 那就需要编码器 如果我们作为客户端 那就需要解码器// 解码器主要用于 将接收到的字节数据转换成有意义的业务对象。比如java 认识的 实体类对象// 在网络编程中,数据在传输过程中通常是以字节数组的形式进行传递的,// 而业务处理通常需要对这些字节数组进行解析,转换为具体的业务对象。解码器正是完成这一工作的组件// 而且解码器 还需要 处理拆包和沾包 等  所以我们在使用 tcp 进行通信的时候 到业务层处理 的时候 需要先经过解码器socketChannel.pipeline().addLast(new ServerHandler());}});// 绑定端口并同步等待channelFuture = bootstrap.bind(host, port).sync();log.info("======Start Up Success!=========");} catch (Exception e) {e.printStackTrace();}return channelFuture;}public void close() {workGroup.shutdownGracefully();bossGroup.shutdownGracefully();log.info("======Shutdown Netty Server Success!=========");}//测试 mainpublic static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}});ChannelFuture channelFuture = bootstrap.connect("192.168.0.209", 17070).sync();Channel channel = channelFuture.channel();// 连续发送多条消息for (int i = 0; i < 100; i++) {//System.getProperty("line.separator") 获取系统换行符  测试一种情况// String message = "Message " + i + System.getProperty("line.separator");// String message = "Message " + i+"|";  //测试第二种情况// String message = "Message"; // 第三种 固定长度情况//  channel.writeAndFlush(message);//第四种 自定义解码器//[4个字节 + 4个字节 + 数据]// 我们传输数据 就要这样拼接数据 消息头 +数据长度+数据String message = "Message"+i;//消息头 4个字节ByteBuf byteBuf = Unpooled.buffer();byteBuf.writeInt(102); //【lengthFieldOffset】  4个字节// 102 测试随便写的 只要是占4个字节的数字就行 为啥写个102就4个字节 因为102 是int类型 int类型的字节数是4个字节// 这个应该是 数据类型基础  int string long  等等 占几个字节的问题//数据长度 4个字节  长度也是 int型 所以跟消息头 一样 所以在设置解码器长度参数的时候 【lengthFieldLength】 4个字节就够用byteBuf.writeInt(message.getBytes().length);//发送的数据byteBuf.writeBytes(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));channel.writeAndFlush(byteBuf);// Thread.sleep(100); // 可能需要调整间隔时间}channel.closeFuture().sync();} finally {group.shutdownGracefully();}}
}

回调类

package com.netty.server.tpcServer;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;import java.text.SimpleDateFormat;
import java.util.Date;/*** User:Json* Date: 2024/4/12**/
public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 客户端数据到来时触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;//第一种到第三种解码器测试
//        System.out.println("client request: " + buf.toString(CharsetUtil.UTF_8)+"======");
//        SimpleDateFormat sf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
//        String callback = sf.format(new Date());
//        ctx.write(Unpooled.copiedBuffer(callback.getBytes()));//第四种 自定义解码器测试System.out.println("========================");// 获取全部数据
//        System.out.println("消息头:"+buf.readInt());
//        int i = buf.readInt();
//        System.out.println("数据长度:"+i);
//        System.out.println("消息体:"+buf.readBytes(i).toString(CharsetUtil.UTF_8));//只获取消息体里的数据System.out.println("只获取消息体: "+buf.toString(CharsetUtil.UTF_8));}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 将发送缓冲区的消息全部写到SocketChannel中ctx.flush();}/*** 发生异常时触发*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(cause.getMessage());cause.printStackTrace();// 释放与ChannelHandlerContext相关联的资源ctx.close();}
}

springboot 启动时运行

package com.netty.server.init;import com.netty.server.tpcServer.NettyServer;
import io.netty.channel.ChannelFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** User:Json* Date: 2024/4/12**/
@Component
@Slf4j
@Order(2)
public class NettyServerInit implements CommandLineRunner {@Resourceprivate NettyServer nettyServer;// 这个 线程  后续优化    比如关闭 等等@Overridepublic void run(String... args) throws Exception {// 开启服务ChannelFuture future = nettyServer.start("192.168.0.209", 17070);// 在JVM销毁前关闭服务Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {nettyServer.close();}});future.channel().closeFuture().sync();}
}

核心测试代码 在 NettyServer 类里

下面逐个分享一下解码器的 意思 :
第一种:

new LineBasedFrameDecoder(1024)

1024 就是规定传入的 字节长度的大小
对于字节 一个字符串占多少字节 int 占多多少字节 long 占多多少字节 如果明白这个 应该很好理解
LineBasedFrameDecoder:传入的参数是消息最大长度,发送消息的大小必须小于设置值
行分隔符解码器(结尾根据 “\n” 作为结束标识)

第二种:

new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("|".getBytes()))

DelimiterBasedFrameDecoder 自定义分割器解码器,结尾根据什么作为结束标识可以自定义 比如用 |

第三种:

new FixedLengthFrameDecoder(7)

// 固定长度解码器,发送的消息需要定长

第四种:重点

new LengthFieldBasedFrameDecoder(1024,4,4,0,0)

第四种:LengthFieldBasedFrameDecoder:基于长度的自定义解码器,比较灵活 这个
这个 解码器 一共有 5个参数
maxFrameLength:最大帧长度。也就是可以接收的数据的最大长度。如果超过,此次数据会被丢弃
lengthFieldOffset:长度域偏移量。存储数据长度的一个偏移量
lengthFieldLength:长度域字节数。存储数据长度的一个大小
lengthAdjustment:数据长度修正。因为长度既可以代表data的长度,也可以是整个消息的长度
initialBytesToStrip:跳过的字节数。可以选择舍弃一部分数据
可以理解为 我们给数据定义传输规则 就像http请求 定义规则 比如 请求头 数据结构 json 等等
所以我们作为接收数据的服务端 可以在开发接收数据的时候 定义规则 然后让客户端按照规则来传输数据
比如 发送一个消息,我们定义的结构为:消息头+数据长度+数据

对于字节 一个字符串占多少字节 int 占多多少字节 long 占多多少字节 如果明白这个 应该很好理解

我们怎么定义规则呢 比如:
整体字节为 15个字节
要发送的数据 “Message” 长度为 7个字节
我们规定 maxFrameLength :1024 接收的最大数据,
lengthFieldOffset:长度域偏移量 一般用来定义消息头 4个字节、
lengthFieldLength:长度域字节数 用于定义数据长度(length)的【值】的大小 4个字节
lengthAdjustment : 0 数据长度修正
假设我设置的数据长度是20,代表了整个消息体的长度,但是我数据却只有12个字节,这往后读20个字节无疑是错的,所以我们需要修正,怎么修正? 减8 就行
所以如果你需要修正你的 数据长度,那么lengthAdjustment就是用来修正的。
initialBytesToStrip :8 ,如果在我们业务层只需要 消息体 像消息头 和 数据长度都不需要 那么 initialBytesToStrip就是用来跳过的字节数。
就比如 消息头规定 4个字节 数据长度 4个字节 所以我们如果只要 消息体里的内容 就需要跳过 8个字节 所以设置8 即可
结合发送端和 接收数据端 一个例子 整体

接收数据端的 定义

 socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,4,4,0,8));

发送端

     //第四种 自定义解码器//[4个字节 + 4个字节 + 数据]// 我们传输数据 就要这样拼接数据 消息头 +数据长度+数据String message = "Message"+i;//消息头 4个字节ByteBuf byteBuf = Unpooled.buffer();byteBuf.writeInt(102); //【lengthFieldOffset】  4个字节// 102 测试随便写的 只要是占4个字节的数字就行 为啥写个102就4个字节 因为102 是int类型 int类型的字节数是4个字节// 这个应该是 数据类型基础  int string long  等等 占几个字节的问题//数据长度 4个字节  长度也是 int型 所以跟消息头 一样 所以在设置解码器长度参数的时候 【lengthFieldLength】 4个字节就够用byteBuf.writeInt(message.getBytes().length);//发送的数据byteBuf.writeBytes(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));channel.writeAndFlush(byteBuf);

测试结果
没有解码器的情况
在这里插入图片描述

有了之后:
在这里插入图片描述

http://www.lryc.cn/news/385120.html

相关文章:

  • Python爬虫实战之爬取京东商品数据
  • 浅析Resource Quota中limits计算机制
  • 《数据结构与算法基础 by王卓老师》学习笔记——1.4算法与算法分析
  • 运维团队如何加强安全设备监控与日志管理
  • 仓库管理系统13--物资设置
  • 机器人控制系列教程之URDF文件语法介绍
  • Arathi Basin (AB) PVP15
  • Ubuntu/Linux SSH 端口转发
  • flask的locked_cached_property
  • OSI七层模型TCP/IP四层面试高频考点
  • Swagger2及常用校验注释说明
  • 【项目实训】各种反爬策略及爬虫困难点总结
  • 能量智慧流转:全面升级储能电站的智能网关解决方案
  • 【金融研究】6月,对冲基金狂卖美国科技股 短期乐观,长期悲观?“油价最大空头”花旗:明年跌到60
  • GroundingDINO1.5突破开放式物体检测界限:介绍与应用
  • centos编译内核ko模块
  • Android13 WMS窗口层级树
  • 计算机毕业设计Python+LSTM+Tensorflow股票分析预测 基金分析预测 股票爬虫 大数据毕业设计 深度学习 机器学习 数据可视化 人工智能
  • 仓库管理系统14--仓库设置
  • Python 算法交易实验73 QTV200第二步: 数据清洗并写入ClickHouse
  • 记录:有趣的C#多元运算符 ? : 表达式写法
  • 华宽通中标长沙市政务共性能力建设项目,助力智慧政务建设新飞跃
  • [面试题]计算机网络
  • 企业级低代码开发效率变革赋能业务增长
  • 2024最新总结:1500页金三银四面试宝典 记录35轮大厂面试(都是面试重点)
  • 使用Spring Boot和Thymeleaf构建动态Web页面
  • 扫盲之webSocket
  • 一些硬件知识(十二)
  • Adobe Acrobat编辑器最新版下载安装 Adobe Acrobat版本齐全!
  • k8s如何使用 HPA 实现自动扩展