当前位置: 首页 > news >正文

C#+redis实现消息队列的发布订阅功能

代码

参考c#+redis stream实现消息队列以及ack机制文章的思路,实现
SubscribeAttribute.cs

using System;namespace DotnetQueue.Attributes
{/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited = false)]public class SubscribeAttribute : Attribute{/// <summary>/// 订阅的名称/// </summary>public string Name { get; }/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name){Name = name;}/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name, string groupName, string consumerName){Name = name;GroupName = groupName;ConsumerName = consumerName;}/// <summary>/// 群组名称/// </summary>public string GroupName { get; private set; } = "group01";/// <summary>/// 消费者名称/// </summary>public string ConsumerName { get; private set; } = "consumer01";}
}

新建SubscribeMethod.cs

using System;
using System.Reflection;namespace DotnetQueue
{/// <summary>/// 订阅的方法/// </summary>public class SubscribeMethod{/// <summary>/// 类的类型/// </summary>public Type ClassType { get; set; }/// <summary>/// 方法/// </summary>public MethodInfo MethodInfo { get; set; }}
}

新建DotnetQueueCore.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FreeRedis;
using DotnetQueue.Attributes;namespace DotnetQueue
{/// <summary>/// 消息队列核心/// </summary>public class DotnetQueueCore{private readonly CancellationToken cancellationToken;private readonly IRedisClient redisClient;/// <summary>/// 构造函数注入/// </summary>/// <param name="connectionString">redis链接字符串</param>/// <param name="cancellationToken">取消标识</param>public DotnetQueueCore(string connectionString, CancellationToken cancellationToken){this.cancellationToken = cancellationToken;this.redisClient = new RedisClient(connectionString);}/// <summary>/// 发布消息/// </summary>/// <param name="name"></param>/// <param name="msg"></param>/// <returns></returns>public async Task<bool> Publish(string name, string msg){var msgId = await redisClient.XAddAsync(name, "param", msg);return !string.IsNullOrEmpty(msgId);}/// <summary>/// 订阅监听/// </summary>/// <param name="subscribeTypes"></param>public async Task SubscribeListeners(List<Type> subscribeTypes){var allMethods = GetAllMethods(subscribeTypes);List<Task> tasks = new List<Task>();foreach (var subMethod in allMethods){if (subMethod.ClassType == null){await Task.Delay(TimeSpan.FromSeconds(1));continue;}tasks.Add(Task.Run(async () =>{var method = subMethod.MethodInfo;var parameterInfos = method.GetParameters();var attribute = method.GetCustomAttribute<SubscribeAttribute>();var name = attribute.Name;var groupName = attribute.GroupName;var consumerName = attribute.ConsumerName;var ids = new Dictionary<string, string>();ids.Add(name, ">");while (!cancellationToken.IsCancellationRequested){try{//如果数据存在则不需要执行了,第一次需要执行var exists = await redisClient.ExistsAsync(name);if (!exists){await Task.Delay(TimeSpan.FromSeconds(1));continue;}var info = await redisClient.XInfoGroupsAsync(name);if (info == null || info.Length < 1){//创建群组await redisClient.XGroupCreateAsync(name, groupName, id: "0-0", MkStream: true);}var messages =await redisClient.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids); if (messages == null||messages.Length<=0){await Task.Delay(TimeSpan.FromSeconds(1));continue;}foreach (var message in messages){foreach (var entry in message.entries){var obj = Activator.CreateInstance(method.DeclaringType);if (parameterInfos.Length <= 0){method.Invoke(obj, null);}else{var methodParams = GetStreamEntryValues(entry);method.Invoke(obj, methodParams.ToArray());}//确认消息,如果不加会一直读取第一条await redisClient.XAckAsync(name,groupName, entry.id);}}//await redisClient.XReadAsync(0, name, "0-0");await Task.Delay(TimeSpan.FromSeconds(1));}catch (Exception ex){await Task.Delay(TimeSpan.FromSeconds(1));}}}, cancellationToken));}await Task.WhenAll(tasks);}/// <summary>/// 获取全部的标记的方法/// </summary>/// <param name="types"></param>/// <returns></returns>private List<SubscribeMethod> GetAllMethods(List<Type> types){var result = new List<SubscribeMethod>();foreach (var typeInfo in types){var methods = typeInfo.GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance).Where(m => Attribute.IsDefined(m, typeof(SubscribeAttribute))).ToList();foreach (var method in methods){result.Add(new SubscribeMethod(){MethodInfo = method,ClassType = typeInfo});}}return result;}/// <summary>/// 获取redis stream中的参数/// </summary>/// <param name="msg"></param>/// <returns></returns>private List<object> GetStreamEntryValues(StreamsEntry msg){if (msg == null || msg.fieldValues == null || msg.fieldValues.Length <= 0){return null;}List<object> res = new List<object>();var length = msg.fieldValues.Length;for (int i = 0; i < length; i++){if (msg.fieldValues[i].ToString() == "param"){if ((i + 1) < length){res.Add(msg.fieldValues[i + 1]);}}}return res;}}
}

测试

SubscribeClassTest.cs

using DotnetQueue.Attributes;namespace DotnetQueue.Test;public class SubscribeClassTest
{/// <summary>/// 测试订阅/// </summary>/// <param name="res"></param>/// <returns></returns>[Subscribe("test.wjl.aaa")]public bool SubTest(string res){Console.WriteLine(res);return !string.IsNullOrEmpty(res);}
}

测试方法

private DotnetQueueCorequeueCore = null;
private CancellationTokenSource cancellationToken = null;/// <summary>
/// 测试方法执行之前
/// </summary>
[TestInitialize]
public void Initialize()
{var connectionString = "127.0.0.1:6379,password=,connectTimeout=3000,connectRetry=1,syncTimeout=10000,DefaultDatabase=0";cancellationToken = new CancellationTokenSource();queueCore = new DotnetQueueCore(connectionString, cancellationToken.Token);
}/// <summary>
/// 执行完成之后
/// </summary>
[TestCleanup]
public void Cleanup()
{cancellationToken.Cancel();
}/// <summary>
/// 发布消息测试
/// </summary>
[TestMethod]
public async Task PublishTestMethod()
{var res = await queueCore.Publish("test.wjl.aaa", "{\"age\":1}");Assert.IsTrue(res);res = await queueCore.Publish("test.wjl.aaa", "{\"age\":2}");Assert.IsTrue(res);
}/// <summary>
/// 接受消息测试
/// </summary>
[TestMethod]
public async Task SubscribeListenersTestMethod()
{var types = new List<Type>();types.Add(typeof(SubscribeClassTest));await queueCore.SubscribeListeners(types);Task.Delay(TimeSpan.FromSeconds(30));Assert.IsTrue(true);
}

参考

https://www.cnblogs.com/yanpeng19940119/p/11603865.html
https://github.com/wmowm/InitQ
https://www.cnblogs.com/tibos/p/14944832.html

http://www.lryc.cn/news/536789.html

相关文章:

  • Docker容器基本操作
  • 从无序到有序:上北智信通过深度数据分析改善会议室资源配置
  • 总结:使用JDK原生HttpsURLConnection,封装HttpsUtil工具类,加载自定义证书验证,忽略ssl证书验证
  • 重新定义人机关系边界,Soul以AI社交构建多元社交元宇宙
  • HTTP 参数污染(HPP)详解
  • 阿里云轻量服务器docker部署nginx
  • (萌新入门)如何从起步阶段开始学习STM32 —— 我应该学习HAL库还是寄存器库?
  • Windchill开发-电子仓相关对象信息查询SQL
  • MySQL 数据库定时任务及进阶学习
  • DeepSeek教unity------MessagePack-01
  • 知识拓展:Python序列化模块 marshal 模块详解
  • leetcode 2684. 矩阵中移动的最大次数
  • 机械学习基础-6.更多分类-数据建模与机械智能课程自留
  • 自动化测试实战
  • qt QPlainTextEdit总结
  • AWS SES 邮件服务退信/投诉处理与最佳实践指南
  • 理解WebGPU 中的 GPUAdapter :连接浏览器与 GPU 的桥梁
  • rpx和px混用方案
  • 光伏设计软件分类:无人机、Unity3D引擎齐上阵
  • 太速科技-616-基于6U VPX XCVU9P+XCZU7EV的双FMC信号处理板卡
  • 国产鲲鹏920+欧拉+达梦
  • LeetCode--146. LRU 缓存【Golang中的list】
  • 查看notebook的jupyter token
  • vue+springboot+webtrc+websocket实现双人音视频通话会议
  • 什么是高亮环形光源
  • 2025年3月一区SCI-混沌进化优化算法Chaotic evolution optimization-附Matlab免费代码
  • 51单片机俄罗斯方块开机动画
  • RK3588开发板部署DeepSeek-R1-Distill-Qwen-1.5B的步骤及问题
  • 网络安全 | 安全信息与事件管理(SIEM)系统的选型与实施
  • DeepSeek接口联调(postman版)