当前位置: 首页 > news >正文

【SpringBoot3】双向实时通讯 websocket

文章目录

    • 一、Websocket使用步骤
    • 二、示例1:继承抽象类 `AbstractWebSocketHandler`
      • 后端代码
      • 前端代码
    • 三、示例2:使用注解`@ServerEndpoint`
      • 后端代码
      • 前端代码
    • 四、前端代码封装

一、Websocket使用步骤

在Spring Boot中使用WebSocket是一个常见的需求,因为WebSocket提供了一种在客户端和服务器之间进行全双工通信的方式。Spring Boot通过Spring的WebSocket支持,使得在应用程序中集成WebSocket变得非常简单。

以下是在Spring Boot中使用WebSocket的基本步骤:

1、添加依赖:
首先,你需要在你的pom.xml(如果你使用Maven)或build.gradle(如果你使用Gradle)中添加WebSocket的依赖。对于Spring Boot项目,你通常只需要添加spring-boot-starter-websocket依赖。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、配置WebSocket:
接下来,你需要创建一个配置类实现接口WebSocketConfigurer来启用WebSocket,并配置相关的端点和消息代理。使用@EnableWebSocket注解来启用WebSocket功能。

3、创建控制器:
一旦WebSocket被配置,你就可以创建一个控制器来处理WebSocket消息。
创建一个class类,集成抽象类AbstractWebSocketHandler,或者使用注解@ServerEndpoint来声明Websocket Endpoint

4、客户端连接:
最后,你需要在客户端连接到WebSocket服务器。这可以通过使用WebSocket API来实现。

二、示例1:继承抽象类 AbstractWebSocketHandler

后端代码

1、创建Websocket处理类Ws1Handler,继承抽象类AbstractWebSocketHandler

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.thymeleaf.util.StringUtils;import static com.hj.springboot3.ws.demo1.Ws1Pool.broadcast;/*** websocket事件处理* <p>* 链接:/ws/demo1?userId=xxxx*/
public class Ws1Handler extends AbstractWebSocketHandler {private static final Logger logger = LoggerFactory.getLogger(Ws1Handler.class);@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {Ws1Pool.add(session);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {Ws1Pool.remove(session);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {if (StringUtils.equals("ping", message.getPayload())) {// 心跳消息session.sendMessage(new TextMessage("pong"));return;}logger.info("receive Msg :" + message.getPayload());TextMessage msg = new TextMessage(message.getPayload());// 回发信息 给 js前端session.sendMessage(msg);}}

消息对象VO

@Data
@NoArgsConstructor
@AllArgsConstructor
public class WsMsgVo {private String text;private Long userId;
}

将请求参数字符串转换成map 工具类

public class ParamUtil {/** 将请求参数字符串转换成map */public static Map<String, String> parser(String queryString) {Map<String, String> map = new HashMap<String, String>();if (StringUtils.isNotBlank(queryString)) {String[] params = queryString.split("&");for (String p : params) {String[] strs = p.split("=");if (strs.length == 2) {map.put(strs[0], strs[1]);}}}return map;}
}

2、创建websocket链接池

存储所有在线用户链接,并实现发送消息和广播消息的功能;使用异步线给前端发送消息

/*** websocket链接池*/
public class Ws1Pool {private static final Logger logger = LoggerFactory.getLogger(Ws1Pool.class);private static final Map<String, WebSocketSession> pool = new ConcurrentHashMap<>();private static final Map<Long, List<String>> userMap = new ConcurrentHashMap<>();private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);public static void add(WebSocketSession inbound) {pool.put(inbound.getId(), inbound);Map<String, String> map = ParamUtil.parser(inbound.getUri().getQuery());Long userId = Long.valueOf(map.get("userId"));logger.info("add userId:{}", userId);List<String> lstInBound = userMap.computeIfAbsent(userId, k -> new ArrayList<>());lstInBound.add(inbound.getId());logger.info("add connetion {},total size {}", inbound.getId(), pool.size());}public static void remove(WebSocketSession socket) {String sessionId = socket.getId();List<String> lstInBound = null;Map<String, String> map = ParamUtil.parser(socket.getUri().getQuery());Long userId = Long.valueOf(map.get("userId"));logger.info("remove userId:{}", userId);if (StringUtils.isNotBlank(sessionId)) {lstInBound = userMap.get(userId);if (lstInBound != null) {lstInBound.remove(sessionId);if (lstInBound.isEmpty()) {userMap.remove(userId);}}}pool.remove(sessionId);logger.info("remove connetion {},total size {}", sessionId, pool.size());}/** 推送信息 */public static void broadcast(WsMsgVo vo) {Long userId = vo.getUserId();List<String> lstInBoundId;if (userId == null || userId == 0L) {// 发送给所有人lstInBoundId = userMap.values().stream().flatMap(List::stream).collect(Collectors.toList());} else {lstInBoundId = userMap.get(userId);}if (lstInBoundId == null || lstInBoundId.isEmpty()) {return;}threadPool.execute(() -> {try {for (String id : lstInBoundId) {// 发送给指定用户WebSocketSession connection = pool.get(id);if (connection != null) {synchronized (connection) {TextMessage msg = new TextMessage(vo.getText());connection.sendMessage(msg);}}}} catch (Exception e) {logger.error("broadcast error: userId:{}", userId, e);}});}}

3、创建Websocket配置类,并注册地址/ws/demo1,将地址和处理类绑定

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;/*** Websocket配置*/
@Configuration
@EnableWebSocket
public class Ws1Config implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(ws1Handler(), "/ws/demo1").setAllowedOrigins("*");}@Beanpublic Ws1Handler ws1Handler() {return new Ws1Handler();}}

前端代码

在这段代码中,我们首先创建了一个新的WebSocket对象,并传入了WebSocket服务的URL。然后,我们为这个WebSocket连接添加了四个事件监听器:

  1. onopen:当WebSocket连接打开时触发。
  2. onmessage:当从服务器接收到消息时触发。
  3. onerror:当WebSocket发生错误时触发。
  4. onclose:当WebSocket连接关闭时触发。

你可以根据自己的需求,在这些事件监听器中添加相应的逻辑。例如,在onopen事件监听器中发送一个消息给服务器,或者在onmessage事件监听器中处理从服务器接收到的消息。

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org"xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout">
<head>
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>websocket demo1</title>
</head>
<body>
<div><div id="content"><div>信息面板</div></div><input type="text" id="input" placeholder="请输入内容" /><button id="send" onclick="doSend()">发送</button></div>
<script type="text/javascript">// 创建一个新的WebSocket并连接到指定的URLvar socket = new WebSocket('ws://localhost:8080/ws/demo1?userId=001');// 当WebSocket连接打开时触发socket.onopen = function(event) {console.log('Connection opened');// 可以选择在这里发送一些数据到服务器socket.send('Hello, server!');};// 当从服务器接收到数据时触发socket.onmessage = function(event) {if (event.data == null || event.data == '' || "pong" == event.data) {//心跳消息console.log("Info: 心跳消息");} else {console.log('Message from server ', event.data);var div_msg = document.createElement('div');div_msg.textContent = event.data;document.getElementById('content').appendChild(div_msg)}};// 当WebSocket连接关闭时触发socket.onclose = function(event) {console.log('Connection closed');};// 当WebSocket连接发生错误时触发socket.onerror = function(error) {console.error('WebSocket Error:', error);};function doSend(){var input_dom = document.getElementById('input')var value = input_dom.value;input_dom.value=''input_dom.focus()socket.send(value);}</script></body>
</html>

三、示例2:使用注解@ServerEndpoint

后端代码

1、创建Websocket处理类Ws3Handler,并使用注解@ServerEndpoint声明

import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.thymeleaf.util.StringUtils;import java.io.IOException;/*** websocket事件处理* <p>* 链接:/ws/demo3?userId=xxxx*/
@Component
@ServerEndpoint("/ws/demo3")
public class Ws3Handler {private static final Logger logger = LoggerFactory.getLogger(Ws3Handler.class);@OnOpenpublic void onOpen(Session session) {Ws3Pool.add(session);}@OnClosepublic void OnClose(Session session) {Ws3Pool.remove(session);}@OnMessagepublic void onMessage(Session session, String message) throws IOException {if (StringUtils.equals("ping", message)) {// 心跳消息session.getBasicRemote().sendText("pong");return;}logger.info("receive Msg :" + message);
//        session.getBasicRemote().sendText(message);Ws3Pool.broadcast(new WsMsgVo(message, 0L));}/*** 错误时调用*/@OnErrorpublic void onError(Session session, Throwable throwable) {throwable.printStackTrace();}}

2、创建websocket链接池

存储所有在线用户链接,并实现发送消息和广播消息的功能;使用异步线给前端发送消息

/*** websocket链接池*/
public class Ws3Pool {private static final Logger logger = LoggerFactory.getLogger(Ws3Pool.class);private static Map<String, Session> pool = new ConcurrentHashMap<>();private static final Map<Long, List<String>> userMap = new ConcurrentHashMap<>();private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);public static void add(Session session) {pool.put(session.getId(), session);Map<String, String> map = ParamUtil.parser(session.getQueryString());Long userId = Long.valueOf(map.get("userId"));logger.info("add userId:{}", userId);List<String> lstInBound = userMap.computeIfAbsent(userId, k -> new ArrayList<>());lstInBound.add(session.getId());logger.info("add connetion {},total size {}", session.getId(), pool.size());}public static void remove(Session session) {String sessionId = session.getId();List<String> lstInBound = null;Map<String, String> map = ParamUtil.parser(session.getQueryString());Long userId = Long.valueOf(map.get("userId"));logger.info("remove userId:{}", userId);if (StringUtils.isNotBlank(sessionId)) {lstInBound = userMap.get(userId);if (lstInBound != null) {lstInBound.remove(sessionId);if (lstInBound.isEmpty()) {userMap.remove(userId);}}}pool.remove(sessionId);logger.info("remove connetion {},total size {}", sessionId, pool.size());}/** 推送信息 */public static void broadcast(WsMsgVo vo) {Long userId = vo.getUserId();List<String> lstInBoundId;if (userId == null || userId == 0L) {// 发送给所有人lstInBoundId = userMap.values().stream().flatMap(List::stream).collect(Collectors.toList());} else {lstInBoundId = userMap.get(userId);}if (lstInBoundId == null || lstInBoundId.isEmpty()) {return;}threadPool.execute(() -> {try {for (String id : lstInBoundId) {// 发送给指定用户Session session = pool.get(id);if (session != null) {synchronized (session) {session.getBasicRemote().sendText(vo.getText());}}}} catch (Exception e) {logger.error("broadcast error: userId:{}", userId, e);}});}}

3、创建Websocket配置类,并注册 ServerEndpointExporter

ServerEndpointExporter 是 Spring Boot 中的一个重要组件,用于导出 WebSocket 服务器端点配置。在 Spring 应用程序中,特别是当你使用 Spring Boot 时,ServerEndpointExporter 能够自动注册使用 @ServerEndpoint 注解声明的 WebSocket 端点。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class Ws3Config {/*** ServerEndpointExporter 作用* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

在这个配置类中,我们定义了一个 serverEndpointExporter 方法,它返回一个 ServerEndpointExporter 的实例。这样,Spring 容器就会管理这个 bean,并自动注册所有使用 @ServerEndpoint 注解声明的 WebSocket 端点。

前端代码

(可以和上面的例子一致)

四、前端代码封装

为了更方便的使用websocket,以及确保在后端服务器重启后,前端websocket能够自动重连,我们可以增加心跳机制

1、创建 ws.handler.js

var WsHander = {};
WsHander.socket = null;
WsHander.connect = (function (host) {WsHander.host = host;if ("WebSocket" in window) {WsHander.socket = new WebSocket(host);} else if ("MozWebSocket" in window) {WsHander.socket = new MozWebSocket(host);} else {console.log("Error: WebSocket is not supported by this browser.");return;}WsHander.socket.onopen = function () {console.log("Info: websocket已启动.");// 心跳检测重置heartCheck.reset().start(WsHander.socket);};WsHander.socket.onclose = function () {console.log("Info: websocket已关闭.");WsHander.reconnect();};WsHander.socket.onmessage = function (message) {heartCheck.reset().start(WsHander.socket);if (message.data == null || message.data === '' || "pong" === message.data) {//心跳消息console.log("Info: 心跳消息");} else {// 收到 Websocket消息,执行业务操作if (doOnWsMessage){doOnWsMessage(message.data);}}};
});
WsHander.reconnect = function (){WsHander.connect(WsHander.host);
}
WsHander.initialize = function (userId, uri) {WsHander.currUserId = userId;if (WsHander.currUserId) {if (window.location.protocol === "http:") {WsHander.connect("ws://" + window.location.host + uri+"?userId=" + WsHander.currUserId);} else {WsHander.connect("wss://" + window.location.host + uri+"?userId=" + WsHander.currUserId);}}
};
WsHander.sendMessage = (function (message) {if (message !== "") {WsHander.socket.send(message);}
});//心跳检测
var heartCheck = {timeout: 5000,// 5秒timeoutObj: null,reset: function () {clearTimeout(this.timeoutObj);return this;},start: function (ws) {var self = this;this.timeoutObj = setTimeout(function () {// 这里发送一个心跳,后端收到后,返回一个心跳消息,// onmessage拿到返回的心跳就说明连接正常ws.send("ping");}, this.timeout);}
}

2、在html页面中使用

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org"xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout">
<head>
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>websocket demo1</title>
</head>
<body>
<div><div id="content"><div>信息面板</div></div><input type="text" id="input" placeholder="请输入内容" /><button id="send" onclick="doSend()">发送</button></div>
<script src="../static/ws.handler.js" ></script>
<script type="text/javascript">WsHander.initialize("001", "/ws/demo3")// 当从服务器接收到数据时触发function doOnWsMessage(messageData){console.log('Message from server ', messageData);var div_msg = document.createElement('div');div_msg.textContent = messageData;document.getElementById('content').appendChild(div_msg)}function doSend(){var input_dom = document.getElementById('input')var value = input_dom.value;input_dom.value=''input_dom.focus()WsHander.sendMessage(value)}</script></body>
</html>
http://www.lryc.cn/news/427051.html

相关文章:

  • 搭建内网开发环境(一)|基于docker快速部署开发环境
  • MATLAB R2023b配置Fortran编译器
  • 2024新型数字政府综合解决方案(七)
  • 搭建高可用k8s集群
  • 完美解决html2canvas + jsPDF导出pdf分页内容截断问题
  • 14 地址映射
  • Java Resilience4j-RateLimiter学习
  • Nginx--地址重写Rewrite
  • webflux源码解析(1)-主流程
  • ipad作为扩展屏的最简单方式
  • 【卡码网Python基础课 17.判断集合成员】
  • 生物研究新范式!AI语言模型在生物研究中的应用
  • python语言day08 属性装饰器和property函数 异常关键字 约束
  • day01JS-数据类型-01
  • MATLAB 手动实现一种高度覆盖值提取建筑物点云的方法(74)
  • git的下载与安装(Windows)
  • 腾讯云AI代码助手 —— 编程新体验,智能编码新纪元
  • 使用 ESP32 和 TFT 屏幕显示实时天气信息 —— 基于 OpenWeatherMap API
  • 高阶数据结构——B树
  • Vue2中watch与Vue3中watch对比和踩坑
  • 在Java程序中执行Linux命令
  • 微信小程序在不同移动设备上的差异导致原因
  • 快速体验fastllm安装部署并支持AMD ROCm推理加速
  • 报错:java: javacTask: 源发行版 8 需要目标发行版 1.8
  • 【数据结构篇】~单链表(附源码)
  • 旋转图像(LeetCode)
  • 入门 - vue中v-model的实现原理和完整用法详解
  • 【区块链+金融服务】港融区域股权服务平台 | FISCO BCOS应用案例
  • Nginx反向代理和前后端分离项目打包部署
  • Spring 中ApplicationContext