当前位置: 首页 > news >正文

Spingboot整合Netty,简单示例

Netty介绍在文章末尾  Netty介绍

项目背景

传统socket通信,有需要自身管理整个状态,业务繁杂等问题。

pom.xml

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.117.Final</version>
</dependency>

代码

封装Netty的服务类

package com.example.server;import com.example.exception.ServiceException;
import com.example.handler.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyServer {// 用于处理连接请求的线程组private EventLoopGroup bossGroup;// 用于处理I/O操作的线程组private EventLoopGroup workerGroup;// 服务器private ChannelFuture channelFuture;/*** 启动 Netty 服务端** @param port 监听的端口*/public void start(int port) {bossGroup = new NioEventLoopGroup(); // 初始化bossGroupworkerGroup = new NioEventLoopGroup(); // 初始化workerGrouptry {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 使用NIO传输.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 添加编解码器和处理器ch.pipeline().addLast(new NettyServerInitializer());}}).option(ChannelOption.SO_BACKLOG, 128) // 设置TCP参数.childOption(ChannelOption.SO_KEEPALIVE, true);// 绑定端口并启动服务器channelFuture = bootstrap.bind(port).sync();log.info("Netty 服务端启动成功,监听端口: {}", port);} catch (InterruptedException e) {log.error("Netty 服务端启动失败,端口: {}, 异常: {}", port, e.getMessage(), e);throw new ServiceException("Netty 服务端启动异常");} catch (Exception e) {log.error("绑定端口时发生异常: {}", e.getMessage(), e);throw new ServiceException("端口绑定异常");}}/*** 停止 Netty 服务端*/public void stop() {if (channelFuture != null) {channelFuture.channel().close();}if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}System.out.println("Netty 服务端已关闭");}}

封装Netty服务的管理类

用来创建和销毁多个Netty

package com.example.server;import com.example.exception.ServiceException;
import org.springframework.stereotype.Service;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Service
public class NettyServerManager {private final ConcurrentHashMap<Integer, NettyServer> nettyServers = new ConcurrentHashMap<>();/*** 启动 Netty 服务端** @param port 监听的端口*/public void startNettyServer(int port) {if (nettyServers.containsKey(port)) {throw new ServiceException("端口 " + port + " 已被占用");}NettyServer nettyServer = new NettyServer();nettyServer.start(port);nettyServers.put(port, nettyServer);}/*** 停止 Netty 服务端** @param port 监听的端口*/public void stopNettyServer(int port) {NettyServer nettyServer = nettyServers.get(port);if (nettyServer != null) {nettyServer.stop();nettyServers.remove(port);}}/*** 获取所有 Netty 服务端实例*/public Map<Integer, NettyServer> getNettyServers() {return nettyServers;}
}

NettyServer的初始化

package com.example.handler;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline()// HTTP 编解码器:支持 HTTP 请求和响应.addLast(new HttpServerCodec())// 聚合 HTTP 消息,避免处理分块的 HTTP 请求.addLast(new HttpObjectAggregator(65536))// WebSocket 协议处理器,确保路径 `/websocket` 处理 WebSocket 握手和控制帧.addLast(new WebSocketServerProtocolHandler("/websocket"))// 字符串解码器,将字节流解析为字符串.addLast(new StringDecoder(CharsetUtil.UTF_8))// 字符串编码器,将字符串转换为字节流.addLast(new StringEncoder(CharsetUtil.UTF_8))// 自定义消息处理器,用于处理业务逻辑.addLast(new WebSocketFrameHandler());}
}

handler处理器类

用来处理具体的消息

package com.example.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class WebSocketFrameHandler extends ChannelInboundHandlerAdapter {@Resourceprivate UserService userService;@PostConstructpublic void init() {webSocketFrameHandler = this;}/*** 当 Channel 变为活跃状态(连接建立)时调用*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.warn("{} 已连接", ctx.channel().remoteAddress());}/*** 当 Channel 变为不活跃状态(连接关闭)时调用*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.warn("{} 已断开", ctx.channel().remoteAddress());}/*** 当从 Channel 中读取到数据时调用*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {}/*** 当一次数据读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// log.info("Channel 数据读取完成");ctx.flush();}/*** 当处理过程中发生异常时调用*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("处理过程中发生异常,连接关闭: {}", cause.getMessage(), cause);ctx.close();}
}

消息传递实体

        这里分享一个我用来消息传递的实体,这个实体需要被序列化传递。因为Websocket传递都是文本的形式


Netty介绍

Netty 是一个异步事件驱动的网络应用框架,主要用于快速开发高性能、高可靠性的网络服务器和客户端。它简化了TCP、UDP等协议的编程,广泛应用于WebSocket等。

特性

异步非阻塞:基于事件驱动模型,支持高并发连接。

高性能:优化了网络通信,资源利用率高。

核心组件

  1. Channel:网络通信的管道,支持多种I/O操作。
  2. EventLoop:事件循环,处理I/O事件。
  3. ChannelHandler:处理I/O事件或拦截操作。
  4. ChannelPipeline:包含多个ChannelHandler,处理入站和出站事件。
  5. ByteBuf:高效的字节容器,优于JDK的ByteBuffer
http://www.lryc.cn/news/524985.html

相关文章:

  • grafana新增email告警
  • Github 2025-01-20 开源项目周报 Top15
  • 【Rabbitmq】Rabbitmq高级特性-发送者可靠性
  • K8S中Service详解(一)
  • Effective C++读书笔记——item23(用非成员,非友元函数取代成员函数)
  • 云原生前端开发:打造现代化高性能的用户体验
  • 循环队列(C语言版)
  • 考研408笔记之数据结构(五)——图
  • 没有公网IP实现seafile本地IP访问和虚拟局域网IP同时访问和上传文件
  • 【Hadoop面试题2025】
  • 2000-2010年各省第三产业就业人数数据
  • 第十一讲 多线程
  • VUE之路由Props、replace、编程式路由导航、重定向
  • windows安装ES
  • 论文速读|Multi-Modal Disordered Representation Learning Network for TBPS.AAAI24
  • 小哆啦解题记:加油站的奇幻冒险
  • 【前端】CSS实战之音乐播放器
  • Games104——渲染中光和材质的数学魔法
  • impala增加字段,hsql查不到数据
  • SpringBoot项目中的异常处理
  • ComfyUI实现老照片修复——AI修复老照片(ComfyUI-ReActor / ReSwapper)尚待完善
  • NLTK命名实体识别(NER)
  • 【游戏设计原理】78 - 持续注意力
  • Android设备:Linux远程lldb调试
  • 多层 RNN原理以及实现
  • [Computer Vision]实验三:图像拼接
  • 【Vim Masterclass 笔记22】S09L40 + L41:同步练习11:Vim 的配置与 vimrc 文件的相关操作(含点评课内容)
  • 5.9 洞察 OpenAI - Translator:日志(Logger)模块的 “时光记录仪”
  • 客户案例:电商平台对帐-账单管理(亚马逊amazon)
  • IP协议特性