当前位置: 首页 > news >正文

Android netty的使用

导入netty依赖

 implementation 'io.netty:netty-all:4.1.107.Final'

使用netty

关闭netty
  /*** 关闭*/private void closeSocket() {LogUtils.i(TAG, "closeSocket");if (nettyManager != null) {nettyManager.close();nettyManager = null;}if (nettyExecutor != null) {try {nettyExecutor.shutdown();nettyExecutor.shutdownNow();} catch (Exception e) {}}}
创建netty
private void initSocket() {closeSocket(); //关闭之前的连接nettyManager = new NettyManager(ip, port, dataTimeOut, readTimeOut, whiteTimeOut);nettyExecutor = Executors.newSingleThreadExecutor();nettyExecutor.execute(new Runnable() {@Overridepublic void run() {nettyManager.connect();  //连接}});nettyManager.setOnNettyListener(new NettyManager.OnNettyListener() {@Overridepublic void onConnectSuccess() {LogUtils.i(TAG, "onConnectSuccess");}@Overridepublic void onConnectFail(int connectFailNum) {LogUtils.i(TAG, "onConnectFail >>" + connectFailNum);if (connectFailNum >= 5) {  //重连次数达到阈值  则重设nett的ipresetNeetyIpAndConnect();}}@Overridepublic void onReceiveData(byte[] bytes) {LogUtils.i(TAG, "onReceiveData");parseReceiveData(bytes);}@Overridepublic void onNeedReCreate() {initSocket();}});}/*** 重新设置ip地址  断开重设ip不需要调用  nettyManager.reConnect(); 因为会自动重连*/private void resetNeetyIpAndConnect() {ip = (String) MmkvUtils.getInstance().decode(Content.SP_KEY_SOCKET_TALK_IP, "");port = (int) MmkvUtils.getInstance().decode(Content.SP_KEY_SOCKET_TALK_PORT, 0);LogUtils.i(TAG, "resetNeetyIpAndConnect>>ip:" + ip + "  port:" + port);nettyManager.setIp(ip);nettyManager.setPort(port);}/**
连接中时 重设ip 需要调用nettyManager.reConnect();
*/
private void resetIp(){String ip = (String) MmkvUtils.getInstance().decode(Content.SP_KEY_SOCKET_TALK_IP, "");int port = (int) MmkvUtils.getInstance().decode(Content.SP_KEY_SOCKET_TALK_PORT, 0);nettyManager.setIp(ip);nettyManager.setPort(port);nettyManager.reConnect(); //重连ip}
}
有时候不需要netty进行数据粘包处理的情况,直接返回原始响应数据则使用 具体参数看完整代码
  nettyManager = new NettyManager(ip, port, dataTimeOut, readTimeOut, whiteTimeOut, 0, 0, 0, 0);

完整代码

package com.baolan.netty;import android.util.Log;import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.StringUtil;public class NettyManager {private static final String TAG = "bl_netty";private ChannelFuture channelFuture;private String ip;private int port;private int dataTimeOut; //收发超时private int readTimeOut; // 读超时时间private int whiteTimeOut; // 写超时时间private ChannelFutureListener channelFutureListener;private NioEventLoopGroup nioEventLoopGroup;private Bootstrap bootstrap;private int connectFailNum = 0;  //重连次数private int lengthFieldOffset = 3; //长度域偏移。就是说数据开始的几个字节可能不是表示数据长度,需要后移几个字节才是长度域private int lengthFieldLength = 2; //长度域字节数。用几个字节来表示数据长度。private int lengthAdjustment = 1; //数据长度修正。因为长度域指定的长度可以使header+body的整个长度,也可以只是body的长度。如果表示header+body的整个长度,那么我们需要修正数据长度。private int initialBytesToStrip = 0; //跳过的字节数。如果你需要接收header+body的所有数据,此值就是0,如果你只想接收body数据,那么需要跳过header所占用的字节数。public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}public NettyManager(String ip, int port, int dataTimeOut, int readTimeOut, int whiteTimeOut) {this(ip, port, dataTimeOut, readTimeOut, whiteTimeOut, 3, 2, 1, 0);}/*** @param ip  连接的地址* @param port  连接的端口* @param dataTimeOut  收发超时* @param readTimeOut  读超时时间* @param whiteTimeOut 写超时时间*/public NettyManager(String ip, int port, int dataTimeOut, int readTimeOut, int whiteTimeOut, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip) {this.ip = ip;this.port = port;this.dataTimeOut = dataTimeOut;this.readTimeOut = readTimeOut;this.whiteTimeOut = whiteTimeOut;this.lengthFieldOffset = lengthFieldOffset;this.lengthFieldLength = lengthFieldLength;this.lengthAdjustment = lengthAdjustment;this.initialBytesToStrip = initialBytesToStrip;Log.i(TAG, "create ip>>" + ip);Log.i(TAG, "create port>>" + port);Log.i(TAG, "create dataTimeOut>>" + dataTimeOut);Log.i(TAG, "create readTimeOut>>" + readTimeOut);Log.i(TAG, "create whiteTimeOut>>" + whiteTimeOut);//进行初始化//初始化线程组nioEventLoopGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class).group(nioEventLoopGroup);bootstrap.option(ChannelOption.TCP_NODELAY, true); //无阻塞bootstrap.option(ChannelOption.SO_KEEPALIVE, true); //长连接bootstrap.option(ChannelOption.SO_TIMEOUT, dataTimeOut); //收发超时bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); //收发超时bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (lengthFieldOffset > 0) {  //为0时 不处理应答数据pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65530, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip));}pipeline.addLast("decoder", new ByteArrayDecoder())  //接收解码方式.addLast("encoder", new ByteArrayEncoder())  //发送编码方式.addLast(new ChannelHandle(NettyManager.this))//处理数据接收.addLast(new IdleStateHandler(readTimeOut, whiteTimeOut, 0)); //心跳 参数1:读超时时间 参数2:写超时时间  参数3: 将在未执行读取或写入时触发超时回调,0代表不处理;读超时尽量设置大于写超时代表多次写超时时写心跳包,多次写了心跳数据仍然读超时代表当前连接错误,即可断开连接重新连接}});}private void create() {if (StringUtil.isNullOrEmpty(ip) || port == 0 || dataTimeOut == 0 || readTimeOut == 0 || whiteTimeOut == 0) {//TODO 设置回调通知service 重连if (onNettyListener != null) {onNettyListener.onNeedReCreate();}return;}if (channelFuture != null && channelFuture.channel().isActive()) {return;}//开始建立连接并监听返回try {channelFuture = bootstrap.connect(new InetSocketAddress(ip, port));channelFuture.addListener(channelFutureListener = new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {boolean success = future.isSuccess();Log.i(TAG, "connect success>>" + success);if (success) {connectFailNum = 0;Log.i(TAG, "connect success !");if (onNettyListener != null) {onNettyListener.onConnectSuccess();}} else { //失败connectFailNum++;future.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {reConnect();}}, 5, TimeUnit.SECONDS);if (onNettyListener != null) {onNettyListener.onConnectFail(connectFailNum);}}}}).sync();} catch (Exception e) {Log.i(TAG, "e>>" + e);e.printStackTrace();}}/*** 发送数据** @param sendBytes*/public void sendData(byte[] sendBytes) {if (channelFuture == null) {return;}Log.i(TAG, "sendDataToServer");if (sendBytes != null && sendBytes.length > 0) {if (channelFuture != null && channelFuture.channel().isActive()) {Log.i(TAG, "writeAndFlush");channelFuture.channel().writeAndFlush(sendBytes);}}}public void receiveData(byte[] byteBuf) {Log.i(TAG, "receiveData>>" + byteBuf.toString());if (onNettyListener != null && byteBuf.length > 0) {onNettyListener.onReceiveData(byteBuf);}}public void connect() {Log.i(TAG, "connect ");if (channelFuture != null && channelFuture.channel() != null && channelFuture.channel().isActive()) {channelFuture.channel().close();//已经连接时先关闭当前连接,关闭时回调exceptionCaught进行重新连接} else {create(); //当前未连接,直接连接即可}}public void reConnect() {Log.i(TAG, "reConnect");if (StringUtil.isNullOrEmpty(ip) || port == 0 || dataTimeOut == 0 || readTimeOut == 0 || whiteTimeOut == 0) {//TODO 设置回调通知service 重连if (onNettyListener != null) {onNettyListener.onNeedReCreate();}return;}connect(); //当前未连接,直接连接即可}public void close() {if (channelFuture != null && channelFutureListener != null) {channelFuture.removeListener(channelFutureListener);channelFuture.cancel(true);}}private OnNettyListener onNettyListener;public void setOnNettyListener(OnNettyListener onNettyListener) {this.onNettyListener = onNettyListener;}public interface OnNettyListener {/*** 连接成功*/void onConnectSuccess();/*** 连接失败*/void onConnectFail(int connectFailNum);/*** 接收到数据** @param bytes*/void onReceiveData(byte[] bytes);/*** 参数丢失 需重新创建*/void onNeedReCreate();}
}
package com.baolan.netty;import android.util.Log;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;public class ChannelHandle extends SimpleChannelInboundHandler<ByteBuf> {private static final String TAG = "bl_netty";private NettyManager nettyManager;public ChannelHandle(NettyManager nettyManager) {this.nettyManager = nettyManager;}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);//成功Log.i(TAG,"channelActive");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//连接失败Log.i(TAG,"channelInactive 连接失败");if (nettyManager != null) {nettyManager.reConnect(); //重新连接}}/*** 心跳检测  当设置时间没有收到事件 会调用* @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {super.userEventTriggered(ctx, evt);if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state().equals(IdleState.WRITER_IDLE)) { //写超时,此时可以发送心跳数据给服务器Log.i(TAG, "userEventTriggered write idle");if (nettyManager == null){return;}}else if (idleStateEvent.state().equals(IdleState.READER_IDLE)){   //读超时,此时代表没有收到心跳返回可以关闭当前连接进行重连Log.i(TAG, "userEventTriggered read idle");ctx.channel().close();}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);Log.i(TAG, "exceptionCaught");cause.printStackTrace();ctx.close();if (nettyManager != null) {nettyManager.reConnect(); //重新连接}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {Log.i(TAG, "channelRead0");if (nettyManager == null){return;}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx, msg);Log.i(TAG, "channelRead >>"+msg.getClass());if (nettyManager == null){return;}if(msg instanceof byte[]){nettyManager.receiveData((byte[]) msg); //收到数据处理}}
}
http://www.lryc.cn/news/337858.html

相关文章:

  • 苹果电脑启动磁盘是什么意思 苹果电脑磁盘清理软件 mac找不到启动磁盘 启动磁盘没有足够的空间来进行分区
  • 【Java SE】多态
  • Yarn vs npm的大同小异Yarn是什么?
  • 1.Godot引擎|场景|节点|GDS|介绍
  • springboot3 redis 实现分布式锁
  • 2024年第十四届MathorCup数学应用挑战赛A题思路分享(妈妈杯)
  • 运动听歌哪款耳机靠谱?精选五款热门开放式耳机
  • Kubernetes学习笔记12
  • Qt Designer 控件箱中的控件介绍及布局比列分配
  • 蓝桥集训之三国游戏
  • MySQL知识整理
  • 代码随想录算法训练营第36天| 435. 无重叠区间、 763.划分字母区间*、56. 合并区间
  • SpringBoot整合Nacos
  • vue3 浅学
  • 三小时使用鸿蒙OS模仿羊了个羊,附源码
  • 如何使用 ArcGIS Pro 制作热力图
  • SpringBoot之集成Redis
  • mybatis-plus与mybatis同时使用别名问题
  • MySQL基础知识——MySQL日志
  • uniapp 地图分幅网格生成 小程序基于map组件
  • python项目练习——22、人脸识别软件
  • Linux中账号登陆报错access denied
  • python语言之round(num, n)小数四舍五入
  • 安全风险攻击面管理如何提升企业网络弹性?
  • 常用的几款性能测试软件
  • 谷歌google浏览器无法更新Chrome至最新版本怎么办?浏览器Chrome无法更新至最新版本
  • 认识异常(1)
  • C++矩阵
  • 解锁智能未来:用Ollama开启你的本地AI之旅
  • CSS实现卡片在鼠标悬停时突出效果