当前位置: 首页 > news >正文

netty websockt之断连重试

断连重试有以下两点考虑:

1、连接异常,比如网络抖动导致连接失败;

2、连接过程中断开连接重试;

主要用到两个工具类:

ChannelFutureListener监听ChannelFuture..isSuccess();

ChannelInboundHandlerAdapter重写channelInactive,当连接变为不活跃,则回调该方法。

完整代码如下:

@Component
public class WebSocketClient {private Channel channel;private Bootstrap bootstrap;private URI uri;private MessageHandler messageHandler;private WebSocketClientHandler handler;private volatile AtomicInteger atomicCount = new AtomicInteger(0);public WebSocketClient initClient(String host, MessageHandler messageHandler) throws Exception {this.messageHandler = messageHandler;if (StringUtils.isEmpty(host)) {throw new RuntimeException("未配置host.");}uri = new URI(host);String scheme = uri.getScheme() == null? WssSchemeEnum.WS.getValue() : uri.getScheme();//判断是否ssl连接,如果是则设置为可信final boolean ssl = WssSchemeEnum.WSS.getValue().equalsIgnoreCase(scheme);final SslContext sslCtx;if (ssl) {sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();} else {sslCtx = null;}EventLoopGroup group = new NioEventLoopGroup();try {bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();p.addFirst(new ChannelInboundHandlerAdapter() {@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.error("【{}】检测到wss断连, 第 {} 次发起重连.", exchange, atomicCount.incrementAndGet());super.channelInactive(ctx);ctx.channel().eventLoop().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);}});if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), getUriPort(uri)));}p.addLast(new HttpClientCodec());p.addLast(new HttpObjectAggregator(8192));p.addLast(WebSocketClientCompressionHandler.INSTANCE);handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()), exchange, messageHandler);p.addLast(handler);}});} catch (Exception e) {log.error("wss创建client异常. e:", e);if (bootstrap != null) {bootstrap.config().group().shutdownGracefully();}throw new RuntimeException("初始化wss连接异常. e: " + e);}doConnect();return this;}public void doConnect() {try {ChannelFuture future = bootstrap.connect(uri.getHost(), getUriPort(uri)).sync();handler.handshakeFuture().sync();future.addListener((ChannelFutureListener) cf -> {if (future.isSuccess()) {channel = future.channel();WssManger.addChannel(exchange, channel);log.info("连接成功.");messageHandler.connectSuccessAction(future.channel());atomicCount.set(0);} else {log.error("监听断连, wss第 {} 次发起重连. ", atomicCount.incrementAndGet());future.channel().eventLoop().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);}});}catch (Exception e) {log.error("连接异常. e:" + e);if (bootstrap != null) {log.info("wss连接异常,第 {} 次发起重连.", atomicCount.incrementAndGet());bootstrap.config().group().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);}}}/*** 根据URI获取对应的port** @param uri uri* @return port*/private int getUriPort(URI uri) {String scheme = uri.getScheme() == null? WssSchemeEnum.WS.getValue() : uri.getScheme();if (!WssSchemeEnum.allScheme().contains(scheme)) {throw new RuntimeException("Only WS(S) is supported.");}if (uri.getPort() == -1) {if (WssSchemeEnum.WS.getValue().equalsIgnoreCase(scheme)) {return WssSchemeEnum.WS.getPort();} else if (WssSchemeEnum.WSS.getValue().equalsIgnoreCase(scheme)) {return WssSchemeEnum.WSS.getPort();} else {return -1;}} else {return uri.getPort();}}
}

http://www.lryc.cn/news/232993.html

相关文章:

  • 【Gateway】基于ruoyi-cloud-plus项目,gateway局部过滤器和过滤返回以及集成nacos
  • mysql -mmm
  • C++初阶 类和对象(下)
  • 使用Postman进行压力测试
  • AI视频检索丨历史视频标签化,助力重要事件高效溯源
  • 【前段基础入门之】=>CSS3新特性 响应式布局
  • 【Java 进阶篇】JQuery 遍历:发现元素的魔法之旅
  • 合肥数字孪生赋能工业制造,加速推进制造业数字化转型
  • Linux发展史与环境安装
  • 【uniapp】 video视频层级、遮挡其他弹窗或顶部导航 使用nvue覆盖
  • opencv(1):创建和显示窗口, 读取保存图片
  • LeetCode530. Minimum Absolute Difference in BST
  • Flink(五)【DataStream 转换算子(上)】
  • 【vitis】 AIE basic
  • 微信抽奖活动怎么做
  • 装机必备!这5款免费软件,你值得拥有!
  • 华为eNSP综合实验考试
  • OPPO Watch纯手机开启远程ADB调试
  • idea查看UML类图
  • 2736. 最大和查询 : 从一维限制到二维限制,逐步思考剖析本题(进阶一问)
  • 2023数维杯国际数学建模A题B题C题D题思路+模型+代码+完整论文
  • java多个jar包编译生成.class文件
  • 小米手环8pro重新和手机配对解决办法
  • element-china-area-data插件vue3做省市区的下拉选择,用3个独立的el-select实现
  • 盘点十大免费低/无代码开发软件,数字化转型看这里
  • 【word密码】word设置只读方式的四个方法
  • 正整数的阶乘
  • 微软Surface/Surface pro笔记本电脑进入bios界面
  • 暂存2暂存2暂存2
  • 深入理解TensorFlow:计算图的重要性与应用