当前位置: 首页 > news >正文

基于WebSocket实现的后台服务

基于WebSocket实现的后台服务,用于接收客户端的心跳消息,并根据心跳消息来维护客户端连接。

具体实现中,服务启动后会创建一个HttpListener对象,用于监听客户端的WebSocket连接请求。当客户端连接成功后,服务会为每个连接创建一个Task实例,用于接收客户端发送的心跳消息,并根据心跳消息更新心跳时间戳。服务还会定期向客户端发送心跳消息,以保持连接的活跃状态。

如果服务在一定时间内没有收到客户端发送的心跳消息,就会认为客户端已经掉线,服务会关闭连接并从连接列表中移除该客户端。

此服务适用于需要实现长连接的场景,例如实时消息推送、在线游戏等。需要注意的是,此服务只能用于WebSocket通信,客户端必须实现WebSocket协议。

using Microsoft.Extensions.Hosting;
using MSEBP.Kernel.Common.Logging;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace Authorization.WebApi
{/// <summary>/// 此代码只能用于 websocket通信,客户端必须websocket实现,暂时无用。/// </summary>public class WebSocketBackgroundService : IHostedService, IDisposable{private const int _heartBeatInterval = 30000; // 心跳间隔(毫秒)private const int _heartBeatTimeout = 60000; // 心跳超时时间(毫秒)private const int _clientIdLength = 10;private readonly CancellationTokenSource _cts = new CancellationTokenSource();private readonly ConcurrentDictionary<string, WebSocket> _clients = new ConcurrentDictionary<string, WebSocket>();private readonly ILogger _logger;/// <summary>/// /// </summary>/// <param name="logger"></param>public WebSocketBackgroundService(ILogger logger){_logger = logger;}/// <summary>/// /// </summary>/// <param name="cancellationToken"></param>/// <returns></returns>public async Task StartAsync(CancellationToken cancellationToken){IPAddress localIp = Dns.GetHostEntry(Dns.GetHostName()).AddressList.FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork);if (localIp == null){throw new Exception("Cannot find local IP address.");}IPEndPoint localEndPoint = new IPEndPoint(localIp, 8181);HttpListener listener = new HttpListener();//listener.Prefixes.Add($"http://{localEndPoint}/");listener.Start();_ = Task.Run(async () =>{try{while (!_cts.IsCancellationRequested){HttpListenerContext context = await listener.GetContextAsync();if (context.Request.IsWebSocketRequest){WebSocket webSocket = await AcceptWebSocketAsync(context);_ = Task.Run(async () =>{await ReceiveHeartbeatAsync(webSocket);}, _cts.Token);}else{context.Response.StatusCode = 400;context.Response.Close();}}}catch (Exception ex){_logger.Error(ex, "WebSocket server error.");}}, _cts.Token);}private async Task<WebSocket> AcceptWebSocketAsync(HttpListenerContext context){HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null);WebSocket webSocket = wsContext.WebSocket;return webSocket;}private async Task ReceiveHeartbeatAsync(WebSocket webSocket){byte[] buffer = new byte[1024];CancellationToken token = _cts.Token;DateTime lastHeartbeatTime = DateTime.UtcNow;try{while (webSocket.State == WebSocketState.Open && !token.IsCancellationRequested){WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);if (result.CloseStatus.HasValue){await CloseWebSocketAsync(webSocket, result.CloseStatus.Value, result.CloseStatusDescription);break;}else if (result.MessageType == WebSocketMessageType.Text){string message = Encoding.UTF8.GetString(buffer, 0, result.Count).Trim();if (message.StartsWith("heartbeat")){lastHeartbeatTime = DateTime.UtcNow;string clientId = message.Substring(0, Math.Min(message.Length, _clientIdLength));_clients.TryAdd(clientId, webSocket);}else if (string.IsNullOrEmpty(message)){await CloseWebSocketAsync(webSocket, WebSocketCloseStatus.NormalClosure, "Closed by client");break;}else{// 处理业务逻辑}}// 检测心跳超时if ((DateTime.UtcNow - lastHeartbeatTime).TotalMilliseconds > _heartBeatTimeout) { await CloseWebSocketAsync(webSocket, WebSocketCloseStatus.NormalClosure, "Heartbeat timeout");break;}}}catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely){// WebSocket 连接被意外关闭,忽略异常}catch (Exception ex){_logger.Error(ex, "WebSocket error.");}finally{// 移除客户端连接foreach (var item in _clients){if (item.Value == webSocket){_clients.TryRemove(item.Key, out _);break;}}await CloseWebSocketAsync(webSocket, WebSocketCloseStatus.NormalClosure, "Closed by server");}}private async Task CloseWebSocketAsync(WebSocket webSocket, WebSocketCloseStatus closeStatus, string closeStatusDescription){try{await webSocket.CloseAsync(closeStatus, closeStatusDescription, CancellationToken.None);}catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely){// WebSocket 连接已经关闭,忽略异常}catch (Exception ex){_logger.Error(ex, "Failed to close WebSocket.");}}public async Task StopAsync(CancellationToken cancellationToken){_cts.Cancel();await Task.CompletedTask;}public void Dispose(){_cts.Dispose();}}
}

http://www.lryc.cn/news/98837.html

相关文章:

  • Go语言中的结构体详解
  • pytest自动化测试指定执行测试用例
  • 英伟达 H100 vs. 苹果M2,大模型训练,哪款性价比更高?
  • var、let和const的区别
  • (css)AI智能问答页面布局
  • 【Pytorch学习】pytorch中的isinstance() 函数
  • (树) 剑指 Offer 07. 重建二叉树 ——【Leetcode每日一题】
  • Gitlab 合并分支与请求合并
  • 【Matter】基于Ubuntu 22.04 编译chip-tool工具
  • 将 MongoDB 的 List<Document> 转换为对象列表
  • 【Linux下6818开发板(ARM)】SecureCRT串口和交叉编译工具(巨细版!)
  • 应届生如何快速找Java开发工程师,先学会这17个基础问题
  • 数学建模学习(5):数学建模各类题型及解题方案
  • 【学习笔记】视频检测方法调研
  • idea terminal npm指令无效
  • 低代码开发平台源码
  • 【UE5 多人联机教程】04-加入游戏
  • 自然语言处理从入门到应用——LangChain:模型(Models)-[大型语言模型(LLMs):缓存LLM的调用结果]
  • Python 算法基础篇之图的遍历算法:深度优先搜索和广度优先搜索
  • 文本缩略 文本超出显示省略号 控制超出省略的行数
  • 云原生架构
  • Java 生成随机数据
  • 基于OpenCV的红绿灯识别
  • JavaScript快速入门:ComPDFKit PDF SDK 快速构建 Web端 PDF阅读器
  • Flutter 网络请求
  • 吃透《西瓜书》第三章 线性模型:多元线性回归
  • 数据结构【排序】
  • 探索APP开发的新趋势:人工智能和大数据的力量
  • 超越传统:深入比较Bootstrap、Foundation、Bulma、Tailwind CSS和Semantic UI的顶级CSS框架!
  • 基于深度学习淡水鱼体重智能识别模型研究