客户端
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head><title>websocket client</title><script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.min.js"></script> <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script> <script src="http://cdn.bootcss.com/jquery/3.1.1/jquery.min.js"></script> <script type="text/javascript">var stompClient = null; $(function(){ connect() }) window.onunload = function() { disconnect() } function connect(){ var userId="4a85f897fc48360be1f8e6abeec40c16"; var socket = new SockJS('http://xxx.xxx.xxx.xxx:8080/webSocket');stompClient = Stomp.over(socket);var headers={"Authorization": "token"}var testMsg = JSON.stringify({'message':'Hello WebSocket!','userId':userId});stompClient.connect(headers,function(frame){ console.log('Connected:' + frame); stompClient.subscribe('/user/'+userId+'/single/ip',function(response){ console.log("点对点消息");console.log(response);var message=JSON.parse(response.body); console.log(message); showResponse(message); }); stompClient.subscribe('/topic',function(response){ console.log("订阅消息");console.log(response);var message=JSON.parse(response.body); console.log(message); }); stompClient.send("/app/testSendMsg",{},testMsg); }); } function disconnect(){ if(stompClient != null) { stompClient.disconnect(); } console.log("Disconnected"); } function showResponse(message){ var response = $("#response"); var msg = JSON.parse(message.responseMessage); response.append("<p>只有userID为"+msg.userId+"的人才能收到</p>"); } </script>
</head>
<body><pre id="response"></pre>
</body>
</html>
服务端
pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
初始化config
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Lazy@Autowiredprivate WebsocketUserInterceptor websocketUserInterceptor;@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/webSocket").setAllowedOrigins("*").withSockJS();}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker("/topic","/user");registry.setApplicationDestinationPrefixes("/app");registry.setUserDestinationPrefix("/user");}@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {registration.interceptors(websocketUserInterceptor);}
}
拦截器
@Slf4j
@Component
public class WebsocketUserInterceptor implements ChannelInterceptor {@Autowiredprivate WebSocketServer webSocketServ;@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (StompCommand.CONNECT.equals(accessor.getCommand())) {log.info("【webSocket】 --- 连接success");Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);if (raw instanceof Map) {Object tokenObj = ((Map) raw).get(JwtTokenUtil.AUTH_HEADER_KEY);if (tokenObj instanceof LinkedList) {String token = ((LinkedList) tokenObj).get(0).toString();String userId = this.getUserIdFromToken(token);accessor.setUser(new WebsocketUserVO(userId));webSocketServ.pushOnlineUser(userId);log.info("【webSocket】 --- userId:{} 上线了,在线数量:{}",userId,webSocketServ.getOnlineUserSize());}}}return message;}@Overridepublic void postSend(Message<?> message, MessageChannel channel, boolean sent) {try {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);if (accessor.getCommand() == null){ return;}switch(accessor.getCommand()){case DISCONNECT:log.info("【webSocket】 --- 断开连接");MessageHeaders headers = message.getHeaders();if(ObjectUtil.isNotNull(headers)) {Object header = headers.get(SimpMessageHeaderAccessor.USER_HEADER);log.info("【webSocket】 --- header:{}", header.toString());if (ObjectUtil.isNotNull(header)) {WebsocketUserVO vo = (WebsocketUserVO) header;webSocketServ.removeOnlineUser(vo.getName());log.info("【webSocket】 断开连接 --- userId:{} 下线了,在线数量:{}", vo.getName(), webSocketServ.getOnlineUserSize());}break;}case MESSAGE:break;case SEND:break;}} catch (Exception e) {log.info("【webSocket】 断开连接 - 异常:{}", e.getMessage());}}private String getUserIdFromToken(String token){token = token.replace(JwtTokenUtil.TOKEN_PREFIX,"");LoginUser user = JSON.parseObject(JWT.decode(token).getSubject(),LoginUser.class);String userId = user.getUserId();log.info("【webSocket】解析token获取userId:{}",userId);return userId;}
}
WebSocketServer
@Component
public class WebSocketServer {public static String WEB_SOCKET_KEY="webSocket:register:";@Autowiredprivate StringRedisTemplate stringRedisTemplate;public void pushOnlineUser(String id) {stringRedisTemplate.opsForValue().set(WEB_SOCKET_KEY+id,id);}public void removeOnlineUser(String id) {stringRedisTemplate.delete(WEB_SOCKET_KEY+id);}public Integer getOnlineUserSize() {return this.getOnlineUserList().size();}public List<String> getOnlineUserList() {Set<String> keys = stringRedisTemplate.keys(WEB_SOCKET_KEY + "*");return stringRedisTemplate.opsForValue().multiGet(keys);}
}
实体bean
public class WebsocketUserVO implements Principal {private String name;public WebsocketUserVO(String name) {this.name = name;}@Overridepublic String getName() {return name;}
}
public class Server2ClientMessage {private String responseMessage;public Server2ClientMessage(String responseMessage) {this.responseMessage = responseMessage;}public String getResponseMessage() {return responseMessage;}public void setResponseMessage(String responseMessage) {this.responseMessage = responseMessage;}
}
public class Client2ServerMessage {private String userId;private String message;public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}
}
测试controller
@Controller
public class SockertController {@Autowiredprivate SimpMessagingTemplate simpMessagingTemplate;@MessageMapping("/testSendMsg")public BaseResponse testSendMsg(Client2ServerMessage socketBean) {System.out.println("收到客户端消息:"+ JSON.toJSON(socketBean));System.out.println("开始发送服务器端消息");Server2ClientMessage message = new Server2ClientMessage("{\"userId\":\""+socketBean.getUserId()+"\"}");simpMessagingTemplate.convertAndSendToUser(socketBean.getUserId(),"/single/ip",message);simpMessagingTemplate.convertAndSend("/topic",message);System.out.println("结束发送服务器端消息");return BaseResponse.success("操作成功");}
}
测试效果

nginx配置协议升级
proxy_http_version 1.1
proxy_set_header Upgrade $http_upgrade
proxy_set_header Connection 'upgrade'