当前位置: 首页 > news >正文

spring boot 实现直播聊天室(二)

spring boot 实现直播聊天室(二)

技术方案:

  • spring boot
  • netty
  • rabbitmq

目录结构

在这里插入图片描述

引入依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.96.Final</version>
</dependency>

SimpleNettyWebsocketServer

netty server 启动类

@Slf4j
public class SimpleNettyWebsocketServer {private SimpleWsHandler simpleWsHandler;public SimpleNettyWebsocketServer(SimpleWsHandler simpleWsHandler) {this.simpleWsHandler = simpleWsHandler;}public void start(int port) throws InterruptedException {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup work = new NioEventLoopGroup(2);try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//HTTP协议编解码器,用于处理HTTP请求和响应的编码和解码。其主要作用是将HTTP请求和响应消息转换为Netty的ByteBuf对象,并将其传递到下一个处理器进行处理。pipeline.addLast(new HttpServerCodec());//用于HTTP服务端,将来自客户端的HTTP请求和响应消息聚合成一个完整的消息,以便后续的处理。pipeline.addLast(new HttpObjectAggregator(65535));pipeline.addLast(new IdleStateHandler(30,0,0));//处理请求参数pipeline.addLast(new SimpleWsHttpHandler());pipeline.addLast(new WebSocketServerProtocolHandler("/n/ws"));pipeline.addLast(simpleWsHandler);}});Channel channel = bootstrap.bind(port).sync().channel();log.info("server start at port: {}", port);channel.closeFuture().sync();} finally {boss.shutdownGracefully();work.shutdownGracefully();}}
}

NettyUtil: 工具类

public class NettyUtil {public static AttributeKey<String> G_U = AttributeKey.valueOf("GU");/*** 设置上下文参数* @param channel* @param attributeKey* @param data* @param <T>*/public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {Attribute<T> attr = channel.attr(attributeKey);if (attr != null) {attr.set(data);}}/*** 获取上下文参数 * @param channel* @param attributeKey* @return* @param <T>*/public static <T> T getAttr(Channel channel, AttributeKey<T> attributeKey) {return channel.attr(attributeKey).get();}/*** 根据 渠道获取 session* @param channel* @return*/public static NettySimpleSession getSession(Channel channel) {String attr = channel.attr(G_U).get();if (StrUtil.isNotBlank(attr)){String[] split = attr.split(",");String groupId = split[0];String username = split[1];return new NettySimpleSession(channel.id().toString(),groupId,username,channel);}return null;}
}

处理handler

SimpleWsHttpHandler

处理 websocket 协议升级时地址请求参数 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom, 解析groupId 和 username ,并设置这个属性到上下文

/*** @Date: 2023/12/13 9:53* 提取参数*/
@Slf4j
public class SimpleWsHttpHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof  FullHttpRequest request){//ws://localhost:8080/n/ws?groupId=xx&username=tomString decode = URLDecoder.decode(request.uri(), StandardCharsets.UTF_8);log.info("raw request url: {}", decode);Map<String, String> queryMap = getParams(decode);String groupId = MapUtil.getStr(queryMap, "groupId", null);String username = MapUtil.getStr(queryMap, "username", null);if (StrUtil.isNotBlank(groupId) && StrUtil.isNotBlank(username)) {NettyUtil.setAttr(ctx.channel(), NettyUtil.G_U, groupId.concat(",").concat(username));}//去掉参数 ===>  ws://localhost:8080/n/wsrequest.setUri(request.uri().substring(0,request.uri().indexOf("?")));ctx.pipeline().remove(this);ctx.fireChannelRead(request);}else{ctx.fireChannelRead(msg);}}/*** 解析 queryString* @param uri* @return*/public static Map<String, String> getParams(String uri) {Map<String, String> params = new HashMap<>(10);int idx = uri.indexOf("?");if (idx != -1) {String[] paramsArr = uri.substring(idx + 1).split("&");for (String param : paramsArr) {idx = param.indexOf("=");params.put(param.substring(0, idx), param.substring(idx + 1));}}return params;}
}
SimpleWsHandler

处理消息

@Slf4j
@ChannelHandler.Sharable
public class SimpleWsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Autowiredprivate PushService pushService;/*** 在新的 Channel 被添加到 ChannelPipeline 中时被调用。这通常发生在连接建立时,即 Channel 已经被成功绑定并注册到 EventLoop 中。* 在连接建立时被调用一次** @param ctx* @throws Exception*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {NettySimpleSession session = NettyUtil.getSession(ctx.channel());if (session == null) {log.info("handlerAdded channel id: {}", ctx.channel().id());} else {log.info("handlerAdded channel group-username: {}-{}", session.group(), session.identity());}}/*** 连接断开时,Netty 会自动触发 channelInactive 事件,并将该事件交给事件处理器进行处理* 在 channelInactive 事件的处理过程中,会调用 handlerRemoved 方法** @param ctx* @throws Exception*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {NettySimpleSession session = NettyUtil.getSession(ctx.channel());if (session!=null){log.info("handlerRemoved channel group-username: {}-{}", session.group(), session.identity());}offline(ctx.channel());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//todo msg 可以是json字符串,这里仅仅只是纯文本NettySimpleSession session = NettyUtil.getSession(ctx.channel());if (session!=null){MessageDto messageDto = new MessageDto();messageDto.setSessionId(session.getId());messageDto.setGroup(session.group());messageDto.setFromUser(session.identity());messageDto.setContent(msg.text());pushService.pushGroupMessage(messageDto);}else {log.info("channelRead0 session is null channel id: {}-{}", ctx.channel().id(),msg.text());}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("SimpleWsHandler 客户端异常断开 {}", cause.getMessage());//todo offlineoffline(ctx.channel());}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent idleStateEvent) {if (idleStateEvent.state().equals(IdleStateEvent.READER_IDLE_STATE_EVENT)) {log.info("SimpleWsIdleHandler channelIdle 5 秒未收到客户端消息,强制关闭: {}", ctx.channel().id());//todo offlineoffline(ctx.channel());}} else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {String attr = NettyUtil.getAttr(ctx.channel(), NettyUtil.G_U);if (StrUtil.isBlank(attr)) {ctx.writeAndFlush("参数异常");offline(ctx.channel());} else {//todo 可以做用户认证等等//记录用户登陆sessionNettySimpleSession session = NettyUtil.getSession(ctx.channel());Assert.notNull(session, "session 不能为空");SessionRegistry.getInstance().addSession(session);}}super.userEventTriggered(ctx,evt);}/*** 用户下线,处理失效 session* @param channel*/public void offline(Channel channel){NettySimpleSession session = NettyUtil.getSession(channel);if (session!=null){SessionRegistry.getInstance().removeSession(session);}channel.close();}}

PushService

推送服务抽取

public interface PushService {/*** 组推送* @param messageDto*/void pushGroupMessage(MessageDto messageDto);}@Service
public class PushServiceImpl implements PushService {@Autowiredprivate MessageClient messagingClient;@Overridepublic void pushGroupMessage(MessageDto messageDto) {messagingClient.sendMessage(messageDto);}
}

NettySimpleSession

netty session 封装

public class NettySimpleSession extends AbstractWsSession {private Channel channel;public NettySimpleSession(String id, String group, String identity, Channel channel) {super(id, group, identity);this.channel = channel;}@Overridepublic void sendTextMessage(MessageDto messageDto) {String content = messageDto.getFromUser() + " say: " + messageDto.getContent();// 不能直接 write content, channel.writeAndFlush(content);// 要封装成 websocketFrame,不然不能编解码!!!channel.writeAndFlush(new TextWebSocketFrame(content));}
}

启动类

@Slf4j
@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}@Beanpublic SimpleWsHandler simpleWsHandler(){return new SimpleWsHandler();}@PostConstructpublic void init() {new Thread(() -> {log.info(">>>>>>>> start netty ws server....");try {new SimpleNettyWebsocketServer(simpleWsHandler()).start(8881);} catch (InterruptedException e) {log.info(">>>>>>>> SimpleNettyWebsocketServer start error", e);}}).start();}}

其他代码参考 spring boot 实现直播聊天室

测试

websocket 地址 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom

在这里插入图片描述

good luck!

http://www.lryc.cn/news/260081.html

相关文章:

  • alibaba fastjson GET List传参 和 接收解析
  • API自动化测试是什么?我们该如何做API自动化测试呢?
  • PyTorch 的 10 条内部用法
  • Django、Echarts异步请求、动态更新
  • Mac部署Odoo环境-Odoo本地环境部署
  • 【✅面试编程题:如何用队列实现一个栈】
  • Windows本地的RabbitMQ服务怎么在Docker for Windows的容器中使用
  • YOLOv5改进 | 2023卷积篇 | AKConv轻量级架构下的高效检测(既轻量又提点)
  • 微信小程序:模态框(弹窗)的实现
  • uniapp交互反馈api的使用示例
  • XUbuntu22.04之HDMI显示器设置竖屏(一百九十八)
  • 如何用 Cargo 管理 Rust 工程系列 甲
  • Windows下ping IP+端口的方法
  • 【python】os.getcwd()函数详解和示例
  • Linux(二十一)——virtualenv安装成功之后,依然提示未找到命令(-bash: virtualenv: 未找到命令)
  • RNN介绍及Pytorch源码解析
  • Qt 文字描边(基础篇)
  • .360勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • Nginx(四层+七层代理)+Tomcat实现负载均衡、动静分离
  • 【前端】vscode 相关插件
  • 【MySQL】MySQL库的增删查改
  • 基于基于深度学习的表情识别人脸打分系统
  • Linux|操作系统|Error: Could not create the Java Virtual Machine 报错的解决思路
  • K8S学习指南-minikube的安装
  • 恒创科技:有哪些免费的CDN加速服务
  • Kibana搜索数据利器:KQL与Lucene
  • float32、int8、uint8、int32、uint32之间的区别
  • 百度搜索展现服务重构:进步与优化
  • icmp协议、ip数据包 基础
  • es6从url中获取想要的参数