当前位置: 首页 > news >正文

使用Springboot + netty 打造聊天服务之Nacos集群问题记录

目录

  • 1、前言
    • 1.1、方法一
    • 1.2、方法二
  • 2、方案二实战
    • 2.1、在netty服务里加上ws连接、中断事件
    • 2.2、在netty服务里加上消息服务
  • 4、总结


使用Springboot + netty 打造聊天服务系列文章
第一章 初始搭建工程
第二章 Nacos集群问题记录


1、前言

在使用Springboot + Nacos + Netty(WebSocket) 集群后,发现了一个问题。
在集群环境下, X用户已经连接上了集群中的A服务器,这时Y用户发送给X用户的消息在B服务器,那么此时的消息应该如何处理呢?
在这里插入图片描述

1.1、方法一

通过广播的模式,把消息发送到MQ(且带有netty的channelId),netty集群的服务都订阅这个MQ。
通过对比channelId,不存在channelId的丢弃消息不处理。存在channelId的服务,处理此消息,并通过channel把消息推送给X用户。
在这里插入图片描述

1.2、方法二

发消息时,去寻找对应用户X的channel。
1、从缓存里获取用户X对应的channelId等信息,首先判断是否在缓存里,如果没有即用户X不在线,用户X下次连接netty服务时,再去推送消息;
2、如果缓存里有,判断此channel是否在当前服务中,
首先判断当前服务里是否有用户X对应的channelId,如果有,直接通过channel发送消息给用户X;
3、如果没有,则去组装IP、端口,去调用此服务的消息服务去发送消息。
在这里插入图片描述

2、方案二实战

方案一非常简单,订阅MQ即可实现,网上案例大多基于此。
我们今天重点讲解方案二,在netty服务里,加入消息服务,后续通过匹配用户X的channel去发送消息;

2.1、在netty服务里加上ws连接、中断事件

在ws连接时,本地服务器加入channelId、channel的缓存;
同时把channelId、本机IP、本机端口放入redis缓存(供远程消息服务调用)。
在这里插入图片描述

2.2、在netty服务里加上消息服务

1、通过传递来的channelId,从缓存里找到对应的服务(IP、端口);
2、调用对应的消息服务(IP、端口加上消息服务的地址)
3、消息服务里,铜鼓
在这里插入图片描述
在这里插入图片描述


import cn.hutool.core.bean.BeanUtil;
import com.qhkj.nettychatserver.bean.domain.Message;
import com.qhkj.nettychatserver.bean.request.MessageRequest;
import com.qhkj.nettychatserver.config.NettyConfig;
import com.qhkj.nettychatserver.config.http.HttpResult;
import com.qhkj.nettychatserver.config.http.HttpResultGenerator;
import com.qhkj.nettychatserver.config.http.HttpStatusEnum;
import com.qhkj.nettychatserver.constant.Common;
import com.qhkj.nettychatserver.constant.NettyCommon;
import com.qhkj.nettychatserver.netty.NettyHandler;
import com.qhkj.nettychatserver.service.MessageService;
import com.qhkj.nettychatserver.util.RedisUtil;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import com.qhkj.nettychatserver.bean.request.NettyMesaage;
import com.qhkj.nettychatserver.service.NettyService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;@Slf4j
@Service("chat")
public class ChatNettyServiceImpl implements NettyService {@Resourceprivate MessageService messageService;@Resourceprivate NettyConfig nettyConfig;@Resourceprivate RedisUtil redisUtil;@Resourceprivate RestTemplate restTemplate;// 确定channel之后,发送消息private void nettyHandler(NettyMesaage message, Channel channel) {log.info("message-> channelId:{}  , nettyName: {}", channel.id(), nettyConfig.getNettyServerName());Date now = new Date();Message dbmsg = Message.builder().messageId(NettyCommon.getIdWorker().nextId()).createTime(now).modifyTime(now).build();BeanUtil.copyProperties(message, dbmsg, Common.options);boolean flag = messageService.insertOne(dbmsg);if (flag) {channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(HttpResultGenerator.success(nettyConfig.getNettyServerName()))));} else {channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(HttpResultGenerator.fail(HttpStatusEnum.INTERNAM_SERVER_ERROR.getCode(), nettyConfig.getNettyServerName() + ""))));}}@Overridepublic HttpResult nettyHandler(MessageRequest request) {NettyMesaage nettyMesaage = new MessageRequest();BeanUtil.copyProperties(request, nettyMesaage);String serverInfo = (String) redisUtil.get(request.getChannelId());if(StringUtils.isEmpty(serverInfo)) {log.info("用户不在线!");return HttpResultGenerator.success("用户不在线!");}Channel channel = NettyHandler.channelMap.get(request.getChannelId());// 本机与用户有连接if(null != channel) {this.nettyHandler(nettyMesaage, channel);} else {String url = "http://" + serverInfo + "/msg/send";HashMap jsonObject =  restTemplate.postForObject(url, request, HashMap.class);if( !jsonObject.get("code").equals(200) ) {log.info("消息发送失败!");return HttpResultGenerator.fail(HttpStatusEnum.SERVER_BUSY.getCode(),"消息发送失败");}}return HttpResultGenerator.success("消息发送成功!");}
}

4、总结

文章写完之后,发现第二种方法问题特别多,需要在用户上下线(ws连接、掉线、netty服务销毁等)时,使用缓存记录用户与服务器的关系。在消息发送给接收方时,从缓存里取出接收方服务器信息,通过接收方服务器通知接收方有新消息。

最后给出2张消息服务架构简图:
1、集群版
在这里插入图片描述

2、单机版
分析单机版,客户端和netty之间的压力是相当小,万人同时在线,人均每秒2条消息,所需带宽也仅仅接近2MB,对应的内存消耗也是非常之小,几乎也是毫无压力。理论上来说,netty单机,10MB带宽、100MB内存就可以支撑5万用户(当然还得维护在线用户channel池、写数据到消息队列等等消耗,支撑2万在线用户肯定是没问题)。

而与之配套的数据服务系统(获取数据、解析消息并存库等),就可以做成集群,分配更大内存、更多的机器,去支撑快速增长的用户。
在这里插入图片描述

http://www.lryc.cn/news/414586.html

相关文章:

  • 全网唯一!R语言顶刊配色包TheBestColors
  • 链表题型思路错误总结
  • 算法学习day28
  • C语言基础题:迷宫寻路(C语言版)
  • 力扣-1两数之和2两数相加-2024/8/3
  • 简站WordPress主题 专业的WordPress建站服务商
  • Final Shell for Mac 虚拟机连接工具【简单易操作,轻松上手】【开发所需连接工具】
  • Oracle JDK:版本、支持与许可
  • 大模型学习笔记 - LLM 之RLHF人类对齐的简单总结
  • 【从零开始一步步学习VSOA开发】 概述
  • 小程序背景图片无法通过 WXSS 获取
  • CC++内存魔术:掌控无形资源
  • 算法--初阶
  • 通过Java实现插入排序(直接插入,希尔)与选择排序(直接选择,堆排)
  • 大型分布式B2B2C多用户商城7.0企业版源码分享【java语言、方便二次开发】
  • C++的结构体、联合体、枚举类型(一)
  • 搭建高可用OpenStack(Queen版)集群(一)之架构环境准备
  • 通过Stack Overflow线程栈溢出的问题实例,详解C++程序线程栈溢出的诸多细节
  • LeetCode刷题笔记 | 3 | 无重复字符的最长子串 | 双指针 | 滑动窗口 | 2025兴业银行秋招笔试题 | 哈希集合
  • 验证cuda和pytorch都按照成功了
  • iOS开发如何自己捕获Crash
  • 雪花算法(Snowflake Algorithm)
  • 〖任务1〗ROS2 jazzy Linux Mint 22 安装教程
  • 图像增强:使用周围像素填充掩码区域
  • 给虚拟机Ubuntu扩展硬盘且不丢数据
  • Oracle(41)如何使用PL/SQL批量处理数据?
  • JavaEE 第2节 线程安全知识铺垫1
  • LeetCode Hot100 零钱兑换
  • 微信小程序接口实现语音转文字
  • [Spark Streaming] 读取 Kafka 消息, 插入到 MySQL