当前位置: 首页 > news >正文

java-netty客户端断线重启

背景

经常会遇到netty客户端,因为网络等多种原因而断线,需要自动重连

核心

就是对连接服务端成功后,对ChannelFuture进行监听,核心代码如下

            f = b.connect("127.0.0.1", 10004).sync(); // (5)f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if(!channelFuture.isSuccess()){System.out.println("重试");channelFuture.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {doReconnect();}},3,TimeUnit.SECONDS);}else{}}});

具体代码

nettyClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import java.util.Random;
import java.util.concurrent.TimeUnit;public class nettyClient {private static ChannelFuture f;static EventLoopGroup workerGroup;static Bootstrap b;static ChannelFutureListener channelFutureListener=null;static NettyClientHandlerInner nettyClientHandlerInner = new NettyClientHandlerInner();public static void main(String[] args) throws Exception {new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {Thread.sleep(2000);nettyClientHandlerInner.sendMSG("writeWumingStatus@@" + new Random().nextInt(20000));Thread.sleep(2000);nettyClientHandlerInner.sendMSG("writePazhanfoStatus@@" + new Random().nextInt(20000));nettyClientHandlerInner.sendMSG("pkRecord@@" + new Random().nextInt(20000));} catch (InterruptedException e) {e.printStackTrace();}}}}).start();init();connectToServer(nettyClientHandlerInner);}public static void init() {workerGroup = new NioEventLoopGroup();b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(nettyClientHandlerInner);}});channelFutureListener=new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if(!channelFuture.isSuccess()){channelFuture.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {doReconnect();}},3,TimeUnit.SECONDS);}else{System.out.println("重连成功");}}};}public static void connectToServer(NettyClientHandlerInner nettyClientHandler) {try {// Start the client.f = b.connect("127.0.0.1", 10004).sync(); // (5)f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if(!channelFuture.isSuccess()){System.out.println("重试");channelFuture.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {doReconnect();}},3,TimeUnit.SECONDS);}else{}}});// Wait until the connection is closed.f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}public static void doReconnect(){ChannelFuture future=b.connect("127.0.0.1", 10004);future.addListener(channelFutureListener);}
}

NettyClientHandlerInner

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;import java.io.IOException;
import java.util.concurrent.TimeUnit;@ChannelHandler.Sharable
class NettyClientHandlerInner extends ChannelInboundHandlerAdapter {ChannelHandlerContext ctxOut;//通道就绪事件(就是在bootstrap启动助手配置中addlast了handler之后就会触发此事件)//但我觉得也可能是当有客户端连接上后才为一次通道就绪public void channelActive(ChannelHandlerContext ctx) throws IOException, InterruptedException {System.out.println("客户端消息,通道激活,可以发送消息了");ctxOut=ctx;}//数据读取事件public void channelRead(ChannelHandlerContext ctx, Object msg) {//传来的消息包装成字节缓冲区String byteBuf = (String) msg;
//        ByteBuf byteBuf = (ByteBuf) msg;//Netty提供了字节缓冲区的toString方法,并且可以设置参数为编码格式:CharsetUtil.UTF_8System.out.println("客户端读取服务返回的数据:" + byteBuf);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)// Close the connection when an exception is raised.cause.printStackTrace();System.out.println(cause.getMessage());ctx.close();}public void  sendMSG(String msg){ctxOut.writeAndFlush(Unpooled.copiedBuffer(msg+"\r\n", CharsetUtil.UTF_8));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);System.out.println("与服务器断开");ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {nettyClient.doReconnect();}}, 3, TimeUnit.SECONDS);ctx.close();}
}

总结

要实现重连,有三个地方需要注意

  1. 对连接成功的ChannelFuture进行监听,调用doReconnect
  2. 实现如上的doReconnect
  3. 在NettyClientHandlerInner中重写channelInactive,再次调用doReconnect
    @Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);System.out.println("与服务器断开");ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {nettyClient.doReconnect();}}, 3, TimeUnit.SECONDS);ctx.close();}
http://www.lryc.cn/news/449388.html

相关文章:

  • MySQL的基础用法一
  • Linux:进程地址空间
  • 数据结构:树、森林
  • AI Agent应用出路到底在哪?
  • 一文了解构建工具——Maven与Gradle的区别
  • electron介绍
  • Redis-持久化
  • 封装轮播图 (因为基于微博小程序,语法可能有些出入,如需使用需改标签)
  • 【Ubuntu】minicom安装、配置、使用以及退出
  • MYSQL的监控
  • CTF ciscn_2019_web_northern_china_day1_web2
  • linux中vim编辑器的应用实例
  • 智慧城市交通管理中的云端多车调度与控制
  • 分治(归并排序)
  • 小学生为什么要学英语
  • 企业云存储如何收费?企业云存储收费标准
  • 一步步教你LangGraph Studio:可视化调试基于LangGraph构建的AI智能体
  • 用SpringBoot打造先进的学科竞赛管理系统
  • Linux入门攻坚——34、nsswitch、pam、rsyslog和loganalyzer前端展示工具
  • 如何在Excel中快速找出前 N 名,后 N 名
  • 创意实现!在uni-app小程序商品详情页轮播中嵌入视频播放功能
  • WAF,全称Web Application Firewall,好用WAF推荐
  • docker中搭建nacos并将springboot项目的配置文件转移到nacos中
  • 概率论原理
  • MYSQL的安装和升级
  • 深入解析 RISC-V 递归函数的栈使用:以阶乘函数为例
  • 【保研纪念】计算机保研经验贴——南大cs、复旦cs、中南cs
  • TopOn对话游戏魔客:2024移动游戏广告应如何突破?
  • Chainlit集成LlamaIndex实现知识库高级检索(BM25全文检索器)
  • Dubbo入门案例