当前位置: 首页 > news >正文

记一次netty客户端的开发

背景

近日要开发一个tcp客户端程序去对接上游厂商的数据源,决定使用netty去处理,由于很久没有开发过netty了,顺便学习记录下

netty搭建

考虑到我们需要多个client去对接server服务,所以我们定义一个公共的AbstractNettyClient父类,定义一些公共的方法,比如,连接,重试等。以达到代码复用
我这里采用的是三层结构的设计,因为对接的上游数据厂商的不止一家,每家厂商会存在一定的定制化逻辑,所以在此进行封装

  1. 公共的nettyClient父类,所有netty 子类继承
  2. 具体上游厂商的父类,实现厂商对接的一些公共处理
  3. 真正实现的子类,有多少个需要对接的,就实现多少
    在这里插入图片描述

需要哪些公共方法

对于最上层的netty,我们应该定义有哪些全局的公共方法,这里给出几个通用示例

public void start(){// 启动操作
}public void stop(){// 停止操作
}public void restart(){// 重启操作
}

需要哪些Handler?

了解过netty基础的都知道,netty中有inbound和outbound两个出入站的链路处理器供我们处理接受与发送的消息,那么作为全局公共的父类,自然要提供给子类可扩展的handler自选,同时也要维护全局公共的handler,那么这里定义一个公共模板的handler获取方法

    public ChannelInitializer getChannelHandler(AbstractNettyConnector connector) {return new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ReadTimeoutHandler(15, TimeUnit.SECONDS));pipeline.addLast(new LoggingHandler());addPipeline(pipeline);}};}public void addPipeline(){// 子类覆盖时间该方法
}

通过这样的方式我们在定义公共的handler同时也能支持子类实现自定义的handler处理特定的事件。
下边给出一些常见的通用handler

  1. ReconnectHandler:重连处理,我们在连接服务端,可能由于网络或者其他问题,会导致连接断开,这个时候我们就需要一个handler去处理重连的情况
  2. ReadTimeoutHandler:因为客户端只做读取,所以这边还需要一个超时读取的handler,用于检测服务读取通道的状态,如果超时未读取数据,那么我们可以做一些操作
  3. DecodeHandler:解码器,在处理真正的消息之前我们需要先解码数据
  4. MessageHandler:消息处理器,解析完解码的数据后,我们真正对消息进行处理的地方

我们整个通用链路的handler,大概就如下
在这里插入图片描述

解码器的选择

由于TCP连接中存在粘包黏包的现象,发过来的消息不一定是个完整的包,所以我们在我们自己的解码器之前还需要定一个解码器处理粘包黏包的问题,对于我自己这边使用的是,定长的解码器
new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, 1500, 0, 2, -2, 0, true)
对于该解码器构造参数的解析

  1. 大端和小端模式的选择,这个需要具体询问上游的消息格式
  2. 定长长度
  3. lengthFieldOffset:表示这个包长度的字段,是从第几个字节开始读
  4. lengthFieldLength:长度字段所占用的字节数
  5. lengthAdjustment:用来修正长度字段,比如说你的长度读出来的字节数,并不包含自己,那么在该值里 你就得填这个长度字段的长度

举例:假设现在有个2个字节的长度字段,读出来的值是152,这个值是数据包的长度不包含长度字段,那么lengthAdjustment就应该填-2,减去长度字段 得到真正的数据长度

连接监听器

在启动netty客户端时,可能会产生连接失败等情况,这个时候我们可以在启动处增加一个连接监听器,用来监控启动情况,这个不同于ReconnectHandler,那个是用于处理连接建立和断开时的重连器

ChannelFuture connect = bootstrap.connect(serverAddress, serverPort);
connect.addListener(new ConnectListener(this));public class ConnectListener implements ChannelFutureListener {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {Throwable cause = future.cause();if (cause instanceof ConnectTimeoutException) {// do something} else {log.error("连接异常", cause);}future.channel().close();}}
http://www.lryc.cn/news/361926.html

相关文章:

  • 策略模式结合Spring使用
  • 基于 RNNs 对 IMDB 电影评论进行情感分类
  • Midjourney绘画参数设置详解
  • 计算机毕业设计 | springboot养老院管理系统 老人社区管理(附源码)
  • 事务与并发控制
  • spring boot 中的异步@Async
  • 【C++/STL】list(常见接口、模拟实现、反向迭代器)
  • wms中对屏幕进行修改wm size设置屏幕宽高原理剖析
  • java面试题及答案2024,java2024最新面试题及答案(之一)
  • Go Modules 使用
  • 结账和反结账
  • k8s怎么监听资源的变更
  • Cobaltstrike常用功能
  • UWP与WPF:微软两大UI框架
  • 【面试】字节码文件是跨平台的吗?
  • SpringCloud中注册中心Nacos的下载与使用步骤
  • 心缘Hub小程序
  • 攻防世界maze做法(迷宫题)
  • PID——调参的步骤
  • Deno入门:Node.js的现代替代品
  • WIFI 万[néng]钥匙 v5.0.10/v4.9.80 SVIP版!
  • JCR一区级 | Matlab实现TCN-BiLSTM-MATT时间卷积双向长短期记忆神经网络多特征分类预测
  • redis之发布与订阅
  • LLM主流开源代表模型
  • Openharmony的usb从框架到hdf驱动流程梳理
  • Apache Doris 基础 -- 数据表设计(数据模型)
  • “雪糕刺客”爆改“红薯刺客”,钟薛高给了消费品牌哪些启示?
  • 多输入多输出非线性对象的模型预测控制—Matlab实现
  • 多项分布模拟及 Seaborn 可视化教程
  • 学计算机,我错了吗?