当前位置: 首页 > news >正文

基于WebFlux的Websocket的实现,高级实现自定义功能拓展

基于WebFlux的Websocket

一、导入XML依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency><!-- 或者引入jackson -->
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.26</version>
</dependency>

二、定义配置类,设置WebSocket拦截器

@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebFluxConfigurer {@Beanpublic HandlerMapping handlerMapping() {Map<String, WebSocketHandler> map = new HashMap<>();map.put("/ws/chat", new MyWebSocketChatHandler());map.put("/ws/echo", new MyWebSocketEchoHandler());SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();mapping.setUrlMap(map);mapping.setOrder(-1); // 需要设置较高的优先级,以避免与其他处理程序冲突return mapping;}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}

三、设置处理器,WebSocket的处理器

// 1. Echo的处理器
public class MyWebSocketEchoHandler implements WebSocketHandler {@NotNull@Overridepublic Mono<Void> handle(WebSocketSession session) {return session.send(session.receive().map(msg -> "Echo: " + msg.getPayloadAsText()).map(session::textMessage));}
}

设置自定义的处理器(高级处理)

public class MyWebSocketChatHandler implements WebSocketHandler {private static final Map<String, WebSocketSession> userMap = new ConcurrentHashMap<>();private static final ObjectMapper objectMapper = new ObjectMapper();@NotNull@Overridepublic Mono<Void> handle(WebSocketSession session) {String query = session.getHandshakeInfo().getUri().getQuery();Map<String, String> queryMap = getQueryMap(query);String userId = queryMap.getOrDefault("id", "");userMap.put(userId, session);System.out.println("当前用户:" + userId);System.out.println("当前在线人数:" + userMap.size());return session.receive().flatMap(webSocketMessage -> {String payload = webSocketMessage.getPayloadAsText();Message message;try {message = objectMapper.readValue(payload, Message.class);if (Integer.parseInt(message.getCode()) == CodeEnum.SUCCESS.getCode()) {// 执行成功模式Mono<Void> targetSession = SuccessMode(message);if (targetSession != null) return targetSession;} else if (Integer.parseInt(message.getCode()) == CodeEnum.ERROR.getCode()) {// 执行出错模式return session.send(Mono.just(session.textMessage("消发送出错了")));} else {// 其他code的功能实现return session.send(Mono.just(session.textMessage("消息格式错误")));}} catch (JsonProcessingException e) {e.printStackTrace();// 这里一定要return,否则会导致线程卡死直接断开连接return session.send(Mono.just(session.textMessage(e.getMessage())));}return session.send(Mono.just(session.textMessage("目标用户不在线")));}).then().doFinally(signal -> userMap.remove(userId)); // 用户关闭连接后删除对应连接}@Nullableprivate static Mono<Void> SuccessMode(Message message) {String targetId = message.getTargetId();if (userMap.containsKey(targetId)) {WebSocketSession targetSession = userMap.get(targetId);if (null != targetSession) {WebSocketMessage textMessage = targetSession.textMessage(message.getMessageText());return targetSession.send(Mono.just(textMessage));}}return null;}// 其他的实现private Map<String, String> getQueryMap(String queryStr) {Map<String, String> queryMap = new HashMap<>();if (!StringUtils.isEmpty(queryStr)) {String[] queryParam = queryStr.split("&");Arrays.stream(queryParam).forEach(s -> {String[] kv = s.split("=", 2);String value = kv.length == 2 ? kv[1] : "";queryMap.put(kv[0], value);});}return queryMap;}
}

注意要先配置一个实体类映射—(注意客户端的信息一定也是要json格式不然会报错哟)

@Data
public class Message {@JsonProperty("code")private String code;@JsonProperty("targetId")private String targetId;@JsonProperty("messageText")private String messageText;@JsonProperty("userId")private String userId;
}

枚举类设置code对应的信息

public enum CodeEnum {SUCCESS(1),ERROR(2);// 其他枚举值...private final Integer code;CodeEnum(int code) {this.code = code;}public Integer getCode() {return code;}
}

最后运行即可。注意访问ws://localhost:8081/ws/chat?userId=123实现私聊的功能,访问ws://localhost:8081/ws/echo即可实现简单的服务器和客户端的回应。群聊功能可以根据自己的需求进行实现,只需要添加对应的code以及获取所有session并发送message即可。

http://www.lryc.cn/news/279399.html

相关文章:

  • 使用 LLVM clang C/C++ 编译器编译 OpenSSL 3.X库
  • 【信息安全】hydra爆破工具的使用方法
  • uniapp中uview组件库丰富的CountTo 数字滚动使用方法
  • inflate流程分析
  • 数据挖掘实战-基于机器学习的电商文本分类模型
  • 第8章-第4节-Java中字节流的缓冲流
  • NULL是什么?
  • FreeRTOS 基础知识
  • 【野火i.MX6NULL开发板】挂载 NFS 网络文件系统
  • 在JavaScript中,Object.assign()方法或展开语法(...)来合并对象,Object.freeze()方法来冻结对象,防止对象被修改
  • 池化、线性、激活函数层
  • ES-极客学习第二部分ES 入门
  • Nodejs软件安装​
  • Photoshop 2024 (PS2024) v25 直装版 支持win/mac版
  • ChatGPT绘画生成软件MidTool:智能艺术的新纪元
  • linux安装MySQL5.7(安装、开机自启、定时备份)
  • openGauss学习笔记-195 openGauss 数据库运维-常见故障定位案例-分析查询语句运行状态
  • Oracle篇—实例中和name相关参数的区别和作用
  • python + selenium 初步实现数据驱动
  • 数字孪生+可视化技术 构建智慧新能源汽车充电站监管平台
  • 微信小程序开发学习笔记《11》导航传参
  • BikeDNA(七)外在分析:OSM 与参考数据的比较1
  • KY43 全排列
  • UltraScale 和 UltraScale+ 生成已加密文件和已经过身份验证的文件
  • 2023年全国职业院校技能大赛软件测试赛题—单元测试卷②
  • 极兔单号查快递,极兔快递单号查询,筛选出途经指定城市的单号
  • [redis] redis高可用之持久化
  • 云原生 微服务 restapi devops相关的一些概念说明(持续更新中)
  • 初学unity学习七天,经验收获总结
  • hcip实验2