当前位置: 首页 > news >正文

使用WebSocket、SockJS、STOMP实现消息实时通讯功能

客户端

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head><title>websocket client</title><script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.min.js"></script>  <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>  <script src="http://cdn.bootcss.com/jquery/3.1.1/jquery.min.js"></script>  <script type="text/javascript">var stompClient = null;   //加载完浏览器后  调用connect(),打开双通道  $(function(){     //打开双通道  connect()  })    //强制关闭浏览器  调用websocket.close(),进行正常关闭  window.onunload = function() {  disconnect()  }  function connect(){  // 当前登录的用户idvar userId="4a85f897fc48360be1f8e6abeec40c16";  //连接SockJS的endpoint名称为"webSocket"  var socket = new SockJS('http://xxx.xxx.xxx.xxx:8080/webSocket');//使用STMOP子协议的WebSocket客户端  stompClient = Stomp.over(socket);// socket 链接传递 Authorization:token 信息var headers={"Authorization": "token"}//   var testMsg = JSON.stringify({'message':'Hello WebSocket!','userId':userId});//连接WebSocket服务端     stompClient.connect(headers,function(frame){      console.log('Connected:' + frame);  //通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息  stompClient.subscribe('/user/'+userId+'/single/ip',function(response){  console.log("点对点消息");console.log(response);var message=JSON.parse(response.body);                                   console.log(message);   showResponse(message);				});  stompClient.subscribe('/topic',function(response){  console.log("订阅消息");console.log(response);var message=JSON.parse(response.body);                                   console.log(message);                    }); // 客户端给服务端发送消息stompClient.send("/app/testSendMsg",{},testMsg);			});  }  //关闭双通道  function disconnect(){  if(stompClient != null) {  stompClient.disconnect();  }  console.log("Disconnected");  }  function showResponse(message){  var response = $("#response");  var msg = JSON.parse(message.responseMessage);  response.append("<p>只有userID为"+msg.userId+"的人才能收到</p>");  }  </script>
</head>
<body><pre id="response"></pre>
</body>
</html>

服务端

pom

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency> 

初始化config

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;/*** webSocket 初始化类*/
@Configuration
// @EnableWebSocketMessageBroker 注解用于开启使用 STOMP 协议来传输基于代理(MessageBroker)的消息,这时候控制器(controller)
// 开始支持@MessageMapping,就像是使用 @requestMapping 一样。
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Lazy@Autowiredprivate WebsocketUserInterceptor websocketUserInterceptor;@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {//注册一个名为 /oauth/ws 的 Stomp 节点(endpoint),并指定使用 SockJS 协议。registry.addEndpoint("/webSocket").setAllowedOrigins("*").withSockJS();}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {//点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理registry.enableSimpleBroker("/topic","/user");//客户端向服务端发起请求时,需要以/app为前缀。registry.setApplicationDestinationPrefixes("/app");//点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/registry.setUserDestinationPrefix("/user");}/*** 采用自定义拦截器,获取connect时候传递的参数* @param registration*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {registration.interceptors(websocketUserInterceptor);}
}

拦截器

@Slf4j
@Component
public class WebsocketUserInterceptor implements ChannelInterceptor {@Autowiredprivate WebSocketServer webSocketServ;@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (StompCommand.CONNECT.equals(accessor.getCommand())) {log.info("【webSocket】 --- 连接success");Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);if (raw instanceof Map) {Object tokenObj = ((Map) raw).get(JwtTokenUtil.AUTH_HEADER_KEY);if (tokenObj instanceof LinkedList) {String token = ((LinkedList) tokenObj).get(0).toString();//设置当前访问器的认证用户String userId = this.getUserIdFromToken(token);accessor.setUser(new WebsocketUserVO(userId));webSocketServ.pushOnlineUser(userId);log.info("【webSocket】 --- userId:{} 上线了,在线数量:{}",userId,webSocketServ.getOnlineUserSize());}}}return message;}/*** 发送消息调用后立即调用 一般用于监听上下线*/@Overridepublic void postSend(Message<?> message, MessageChannel channel, boolean sent) {try {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);//消息头访问器if (accessor.getCommand() == null){ return;}// 避免非stomp消息类型,例如心跳检测switch(accessor.getCommand()){case DISCONNECT://点击断开连接,这里会执行两次,第二次执行的时候,message.getHeaders.size()=5,第一次是6。直接关闭浏览器,只会执行一次,size是5。log.info("【webSocket】 --- 断开连接");MessageHeaders headers = message.getHeaders();if(ObjectUtil.isNotNull(headers)) {Object header = headers.get(SimpMessageHeaderAccessor.USER_HEADER);log.info("【webSocket】 --- header:{}", header.toString());if (ObjectUtil.isNotNull(header)) {WebsocketUserVO vo = (WebsocketUserVO) header;webSocketServ.removeOnlineUser(vo.getName());log.info("【webSocket】 断开连接 --- userId:{} 下线了,在线数量:{}", vo.getName(), webSocketServ.getOnlineUserSize());}break;}case MESSAGE:break;case SEND:break;}} catch (Exception e) {log.info("【webSocket】 断开连接 - 异常:{}", e.getMessage());}}// 解析token获取userIdprivate String getUserIdFromToken(String token){token = token.replace(JwtTokenUtil.TOKEN_PREFIX,"");LoginUser user = JSON.parseObject(JWT.decode(token).getSubject(),LoginUser.class);String userId = user.getUserId();log.info("【webSocket】解析token获取userId:{}",userId);return userId;}
}

WebSocketServer

@Component
public class WebSocketServer {public static String WEB_SOCKET_KEY="webSocket:register:";@Autowiredprivate StringRedisTemplate stringRedisTemplate;// 用户上线public void pushOnlineUser(String id) {stringRedisTemplate.opsForValue().set(WEB_SOCKET_KEY+id,id);}// 用户下线public void removeOnlineUser(String id) {stringRedisTemplate.delete(WEB_SOCKET_KEY+id);}// 获取在线用户数量public Integer getOnlineUserSize() {return this.getOnlineUserList().size();}// 获取在线用户集合public List<String> getOnlineUserList() {Set<String> keys = stringRedisTemplate.keys(WEB_SOCKET_KEY + "*");return stringRedisTemplate.opsForValue().multiGet(keys);}
}

实体bean

public class WebsocketUserVO implements Principal {private  String name;public WebsocketUserVO(String name) {this.name = name;}@Overridepublic String getName() {return name;}
}
/*** 服务器向浏览器发送消息用这个类*/
public class Server2ClientMessage {private String responseMessage;public Server2ClientMessage(String responseMessage) {this.responseMessage = responseMessage;}public String getResponseMessage() {return responseMessage;}public void setResponseMessage(String responseMessage) {this.responseMessage = responseMessage;}
}
/*** 浏览器向服务器发送消息用这个类*/
public class Client2ServerMessage {private String userId;private String message;public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}
}

测试controller

@Controller
public class SockertController {@Autowiredprivate SimpMessagingTemplate simpMessagingTemplate;@MessageMapping("/testSendMsg")public BaseResponse testSendMsg(Client2ServerMessage socketBean) {System.out.println("收到客户端消息:"+ JSON.toJSON(socketBean));System.out.println("开始发送服务器端消息");Server2ClientMessage message = new Server2ClientMessage("{\"userId\":\""+socketBean.getUserId()+"\"}");// 点对点消息simpMessagingTemplate.convertAndSendToUser(socketBean.getUserId(),"/single/ip",message);// 广播消息simpMessagingTemplate.convertAndSend("/topic",message);System.out.println("结束发送服务器端消息");return BaseResponse.success("操作成功");}
}

测试效果

在这里插入图片描述

nginx配置协议升级

proxy_http_version 1.1
proxy_set_header Upgrade $http_upgrade
proxy_set_header Connection 'upgrade'
http://www.lryc.cn/news/33168.html

相关文章:

  • C++回顾(十一)—— 动态类型识别和抽象类
  • 雷电模拟器安卓7以上+Charles抓包APP最新教程
  • vsvode 配置sftp,连接远程linux全过程
  • C++类转换为蓝图、打印日志、蓝图关卡、删除C++文件
  • elasticsearch高级篇:核心概念和实现原理
  • 部署安装Nginx服务实例
  • 云原生架构设计原则及典型技术
  • 【Linux】-- 工具介绍 vim_gcc/g++_gdb
  • JAVA SE: IO流
  • 打破原来软件开发模式的无代码开发平台
  • 06-redux中的hook
  • watch监听不到数组对象的变化
  • 言语理解与表达之语句表达
  • 2023年全国最新食品安全管理员精选真题及答案14
  • 【MySQL】约束
  • C语言学习(三)
  • TOUGH系列软件建模及在地下水、CO2地质封存、水文地球化学、地热等多相多组分系统多过程耦合
  • k8s学习之路 | k8s 工作负载 ReplicaSet
  • python实现半色调技术图像转换
  • c++面试技巧-基础篇
  • 三八妇女节即将到来,跨境电商如何玩转节日营销?
  • 【Java学习笔记】10.条件语句 - if...else及switch case 语句
  • 解析STM32启动过程
  • 微信小程序开发自学笔记 —— 八、小程序基础库的更新迭代
  • Mysql迁移Postgresql
  • 关于信息安全认证CISP、PTE对比分析
  • 游戏场景编辑器和骨骼动画相关软件
  • vue3常用的API
  • Qt中使用
  • controller-runtime搭建operator开发环境