基于WebFlux的Websocket的实现,高级实现自定义功能拓展
基于WebFlux的Websocket
一、导入XML依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency><!-- 或者引入jackson -->
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.26</version>
</dependency>
二、定义配置类,设置WebSocket拦截器
@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebFluxConfigurer {@Beanpublic HandlerMapping handlerMapping() {Map<String, WebSocketHandler> map = new HashMap<>();map.put("/ws/chat", new MyWebSocketChatHandler());map.put("/ws/echo", new MyWebSocketEchoHandler());SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();mapping.setUrlMap(map);mapping.setOrder(-1); // 需要设置较高的优先级,以避免与其他处理程序冲突return mapping;}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
三、设置处理器,WebSocket的处理器
// 1. Echo的处理器
public class MyWebSocketEchoHandler implements WebSocketHandler {@NotNull@Overridepublic Mono<Void> handle(WebSocketSession session) {return session.send(session.receive().map(msg -> "Echo: " + msg.getPayloadAsText()).map(session::textMessage));}
}
设置自定义的处理器(高级处理)
public class MyWebSocketChatHandler implements WebSocketHandler {private static final Map<String, WebSocketSession> userMap = new ConcurrentHashMap<>();private static final ObjectMapper objectMapper = new ObjectMapper();@NotNull@Overridepublic Mono<Void> handle(WebSocketSession session) {String query = session.getHandshakeInfo().getUri().getQuery();Map<String, String> queryMap = getQueryMap(query);String userId = queryMap.getOrDefault("id", "");userMap.put(userId, session);System.out.println("当前用户:" + userId);System.out.println("当前在线人数:" + userMap.size());return session.receive().flatMap(webSocketMessage -> {String payload = webSocketMessage.getPayloadAsText();Message message;try {message = objectMapper.readValue(payload, Message.class);if (Integer.parseInt(message.getCode()) == CodeEnum.SUCCESS.getCode()) {// 执行成功模式Mono<Void> targetSession = SuccessMode(message);if (targetSession != null) return targetSession;} else if (Integer.parseInt(message.getCode()) == CodeEnum.ERROR.getCode()) {// 执行出错模式return session.send(Mono.just(session.textMessage("消发送出错了")));} else {// 其他code的功能实现return session.send(Mono.just(session.textMessage("消息格式错误")));}} catch (JsonProcessingException e) {e.printStackTrace();// 这里一定要return,否则会导致线程卡死直接断开连接return session.send(Mono.just(session.textMessage(e.getMessage())));}return session.send(Mono.just(session.textMessage("目标用户不在线")));}).then().doFinally(signal -> userMap.remove(userId)); // 用户关闭连接后删除对应连接}@Nullableprivate static Mono<Void> SuccessMode(Message message) {String targetId = message.getTargetId();if (userMap.containsKey(targetId)) {WebSocketSession targetSession = userMap.get(targetId);if (null != targetSession) {WebSocketMessage textMessage = targetSession.textMessage(message.getMessageText());return targetSession.send(Mono.just(textMessage));}}return null;}// 其他的实现private Map<String, String> getQueryMap(String queryStr) {Map<String, String> queryMap = new HashMap<>();if (!StringUtils.isEmpty(queryStr)) {String[] queryParam = queryStr.split("&");Arrays.stream(queryParam).forEach(s -> {String[] kv = s.split("=", 2);String value = kv.length == 2 ? kv[1] : "";queryMap.put(kv[0], value);});}return queryMap;}
}
注意要先配置一个实体类映射—(注意客户端的信息一定也是要json格式不然会报错哟)
@Data
public class Message {@JsonProperty("code")private String code;@JsonProperty("targetId")private String targetId;@JsonProperty("messageText")private String messageText;@JsonProperty("userId")private String userId;
}
枚举类设置code对应的信息
public enum CodeEnum {SUCCESS(1),ERROR(2);// 其他枚举值...private final Integer code;CodeEnum(int code) {this.code = code;}public Integer getCode() {return code;}
}
最后运行即可。注意访问ws://localhost:8081/ws/chat?userId=123
实现私聊的功能,访问ws://localhost:8081/ws/echo
即可实现简单的服务器和客户端的回应。群聊功能可以根据自己的需求进行实现,只需要添加对应的code
以及获取所有session
并发送message
即可。