当前位置: 首页 > news >正文

解决websocket不定时出现1005错误

后台抛出异常如下:

Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005
Caused by: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005

分析原因是:

spring cloud gateway 转发websocket请求无法监听到 close 事件 没有收到预期的状态码

解决方案:

在gateway进行请求拦截

代码如下:

@Slf4j
@Component
public class CustomWebsocketRoutingFilter implements GlobalFilter, Ordered {//Sec-Websocket protocolpublic static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";//Sec-Websocket headerpublic static final String SEC_WEBSOCKET_HEADER = "sec-websocket";//http header schemapublic static final String HEADER_UPGRADE_WebSocket = "websocket";public static final String HEADER_UPGRADE_HTTP = "http";public static final String HEADER_UPGRADE_HTTPS = "https";private final WebSocketClient webSocketClient;private final WebSocketService webSocketService;private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;// 不直接使用 headersFilters 用该变量代替private volatile List<HttpHeadersFilter> headersFilters;public CustomWebsocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {this.webSocketClient = webSocketClient;this.webSocketService = webSocketService;this.headersFiltersProvider = headersFiltersProvider;}/* for testing *///http请求转为ws请求static String convertHttpToWs(String scheme) {scheme = scheme.toLowerCase();return "http".equals(scheme) ? "ws" : "https".equals(scheme) ? "wss" : scheme;}@Overridepublic int getOrder() {// Before NettyRoutingFilter since this routes certain http requests//修改了这里 之前是-1 降低优先级return Ordered.LOWEST_PRECEDENCE - 2;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {changeSchemeIfIsWebSocketUpgrade(exchange);URI requestUrl = exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (ServerWebExchangeUtils.isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) {return chain.filter(exchange);}ServerWebExchangeUtils.setAlreadyRouted(exchange);HttpHeaders headers = exchange.getRequest().getHeaders();HttpHeaders filtered = HttpHeadersFilter.filterRequest(getHeadersFilters(), exchange);List<String> protocols = getProtocols(headers);return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols));}/* for testing *///获取请求头里的协议信息List<String> getProtocols(HttpHeaders headers) {List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);if (protocols != null) {ArrayList<String> updatedProtocols = new ArrayList<>();for (int i = 0; i < protocols.size(); i++) {String protocol = protocols.get(i);updatedProtocols.addAll(Arrays.asList(StringUtils.tokenizeToStringArray(protocol, ",")));}protocols = updatedProtocols;}return protocols;}/* for testing */List<HttpHeadersFilter> getHeadersFilters() {if (this.headersFilters == null) {this.headersFilters = this.headersFiltersProvider.getIfAvailable(ArrayList::new);// remove host header unless specifically asked not tothis.headersFilters.add((headers, exchange) -> {HttpHeaders filtered = new HttpHeaders();filtered.addAll(headers);filtered.remove(HttpHeaders.HOST);boolean preserveHost = exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);if (preserveHost) {String host = exchange.getRequest().getHeaders().getFirst(HttpHeaders.HOST);filtered.add(HttpHeaders.HOST, host);}return filtered;});this.headersFilters.add((headers, exchange) -> {HttpHeaders filtered = new HttpHeaders();for (Map.Entry<String, List<String>> entry : headers.entrySet()) {if (!entry.getKey().toLowerCase().startsWith(SEC_WEBSOCKET_HEADER)) {filtered.addAll(entry.getKey(), entry.getValue());}}return filtered;});}return this.headersFilters;}static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {// 检查版本是否适合URI requestUrl = exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme().toLowerCase();String upgrade = exchange.getRequest().getHeaders().getUpgrade();// change the scheme if the socket client send a "http" or "https"if (HEADER_UPGRADE_WebSocket.equalsIgnoreCase(upgrade) && (HEADER_UPGRADE_HTTP.equals(scheme) || HEADER_UPGRADE_HTTPS.equals(scheme))) {String wsScheme = convertHttpToWs(scheme);boolean encoded = ServerWebExchangeUtils.containsEncodedParts(requestUrl);URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build(encoded).toUri();exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);if (log.isTraceEnabled()) {log.trace("changeSchemeTo:[" + wsRequestUrl + "]");}}}//自定义websocket处理方式private static class ProxyWebSocketHandler implements WebSocketHandler {private final WebSocketClient client;private final URI url;private final HttpHeaders headers;private final List<String> subProtocols;ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) {this.client = client;this.url = url;this.headers = headers;if (protocols != null) {this.subProtocols = protocols;} else {this.subProtocols = Collections.emptyList();}}@Overridepublic List<String> getSubProtocols() {return this.subProtocols;}@Overridepublic Mono<Void> handle(WebSocketSession session) {return this.client.execute(this.url, this.headers, new WebSocketHandler() {private CloseStatus adaptCloseStatus(CloseStatus closeStatus) {int code = closeStatus.getCode();if (code > 2999 && code < 5000) {return closeStatus;}switch (code) {case 1000://正常关闭return closeStatus;case 1001://服务器挂了或者页面跳转return closeStatus;case 1002://协议错误return closeStatus;case 1003://收到了不能处理的数据类型return closeStatus;case 1004:// 预留关闭状态码return CloseStatus.PROTOCOL_ERROR;case 1005:// 预留关闭状态码 期望收到状态码但是没有收到return CloseStatus.PROTOCOL_ERROR;case 1006:// 预留关闭状态码 连接异常关闭return CloseStatus.PROTOCOL_ERROR;case 1007://收到的数据与实际的消息类型不匹配return closeStatus;case 1008://收到不符合规则的消息return closeStatus;case 1009://收到太大的不能处理的消息return closeStatus;case 1010://client希望server提供多个扩展,server没有返回相应的扩展信息return closeStatus;case 1011://server遇到不能完成的请求return closeStatus;case 1012:// Not in RFC6455// return CloseStatus.SERVICE_RESTARTED;return CloseStatus.PROTOCOL_ERROR;case 1013:// Not in RFC6455// return CloseStatus.SERVICE_OVERLOAD;return CloseStatus.PROTOCOL_ERROR;case 1015:// 不能进行TLS握手 如:server证书不能验证return CloseStatus.PROTOCOL_ERROR;default:return CloseStatus.PROTOCOL_ERROR;}}/*** send      发送传出消息* receive   处理入站消息流* doOnNext  对每条消息做什么* zip       加入流* then      返回接收完成时完成的Mono<Void>*/@Overridepublic Mono<Void> handle(WebSocketSession proxySession) {Mono<Void> serverClose = proxySession.closeStatus().filter(__ -> session.isOpen()).map(this::adaptCloseStatus).flatMap(session::close);Mono<Void> proxyClose = session.closeStatus().filter(__ -> proxySession.isOpen()).map(this::adaptCloseStatus).flatMap(proxySession::close);// Use retain() for Reactor NettyMono<Void> proxySessionSend = proxySession.send(session.receive().doOnNext(WebSocketMessage::retain));Mono<Void> serverSessionSend = session.send(proxySession.receive().doOnNext(WebSocketMessage::retain));// Ensure closeStatus from one propagates to the otherMono.when(serverClose, proxyClose).subscribe();// Complete when both sessions are donereturn Mono.zip(proxySessionSend, serverSessionSend).then();}@Overridepublic List<String> getSubProtocols() {return CustomWebsocketRoutingFilter.ProxyWebSocketHandler.this.subProtocols;}});}}}

http://www.lryc.cn/news/160330.html

相关文章:

  • 文章内容生成随机图像,并将这些图像上链
  • l8-d9 UDP通信实现
  • MongoDB复杂聚合查询与java中MongoTemplate的api对应
  • WireShark抓包工具的安装
  • 审计智能合约的成本是多少?如何审计智能合约?
  • 9.7 校招 内推 面经
  • 【网络编程】IO多路复用
  • MySQL与postgreSQL数据库的区别
  • 单片机电子元器件-按键
  • Nacos docker实现nacos高可用集群项目
  • 基于Dubbo实现服务的远程调用
  • Redis事务的理解
  • PostgreSQL安装异常,服务无法启动导致创建服务器超时
  • 汽车电子系统网络安全解决方案
  • 切片机制和MR工作机制
  • 【postgresql 基础入门】基础架构和命名空间层次,查看数据库对象再也不迷路
  • 是的,决定放弃算法去机器学习了
  • Python 03(循环语句)
  • 安科瑞铁塔基站能耗监控解决方案
  • 操作系统-线程复用
  • 通达信自定义副图行业指标K线指标 HYZS_QD
  • MDK-Keil AC6 Compiler屏蔽特定警告
  • 计算机网络的故事——了解Web及网络基础
  • [系统安全] 五十三.DataCon竞赛 (2)2022年DataCon涉网分析之恶意样本IOC自动化提取详解
  • 自动驾驶——估计预瞄轨迹YawRate
  • PMP证书考下来要多少费用?
  • C动态分配
  • C语言——程序环境和预处理(再也不用担心会忘记预处理的知识)
  • Docker部署EMQX
  • Spring Cloud(Finchley版本)系列教程(二) 客户端负载均衡Ribbon