当前位置: 首页 > news >正文

.netcore+ef+redis+rabbitmq+dotcap先同步后异步再同步的方法,亲测有效

 背景

场景:有一批设备数据上报到系统,通过rabbitmq进行做消费队列,每15分钟一报,每次报2w条,

如果一条条接收,每接收一条耗时0.5秒,那2w条就要2.7小时,这也太久了......那如何优化呢,要求搜索时长,且mq的数据不要重复消费

原理:

 dotcap原理:这里略过 ,可以自行去了解。这里引出 就是抛弃mq的失败ack机制

IOT上报数据--->IOT平台--->RabbitMQ----->消费MQ----->Redis---->消费Redis的数据

                                                                                                                         ↑

                                                                                                     定时任务去触发(不足100条时)             

采用多任务批处理机制:

1)每次从Redis哈希表中取出100条数据;

2)分页并行处理(每页20条);

3)处理成功后删除缓存。同时设置定时任务(每20秒)处理不足100条的残留数据,确保数据及时性。

方案通过Redis分布式锁防止重复消费,实现了处理效率从小时级到秒级的提升,且保证数据不丢失不重复。

步骤

step1:dotcap:接收mq的消息

[NonAction][CapSubscribe("xxx.device", Group = "xxx.device")]public void SubscribeMQAsync(object obj){var msg = "";var dt = DateTime.Now; try{//将接收到的mq消息直接塞到redis的哈希里RedisHelper.AddH(CommCacheConfig.MQ_ELEC_KEY, Guid.NewGuid().ToStr(), obj);}catch (Exception ex){msg += $"异常,参数:{str}=>{ex.ToStr()}耗时:" + (DateTime.Now - dt).TotalSeconds;throw ex;}}

step2:处理逻辑(核心逻辑)

  public async Task Save(MQData<MQData> data = null, bool isApi = false){var isStill = false;var msg2=string.Empty;try{msg2 += "锁前=> ";var flag = RedisHelper.SetNX(CommCacheConfig.MQ_ELEC_KEY + "_Lock", DateTime.Now.ToStr(), 240);if (flag){msg2 += "锁中=> ";var dic = RedisCacheHelper.HGetAllAsync<MQData<MQData>>(CommCacheConfig.MQ_ELEC_KEY).GetAwaiter().GetResult();if (dic.Count() >= 100) dic = dic.Take(100).ToDictionary(x => x.Key, x => x.Value);isStill=dic.Count() > 0;//如果超过100条代表还有  这里打个表计表示继续if (data != null){if (dic == null) dic = new Dictionary<string, MQData<MQData>>();dic.Add(Guid.NewGuid().ToStr(), data);}if (dic.Count() == 0) return ResParameter.Fail("无数据");var tasks = new List<Task>();var pageSize = 20;var totalPage = dic.Count() / pageSize + dic.Count() % pageSize > 0 ? 1 : 0;if (isApi)msg2 += "isApi=> ";//这个改多任务处理var  dt=DateTime.Now;var pages = new List<int>();for (var page = 1; page <= totalPage; page++){var dic2 = dic.Skip((page - 1) * pageSize).Take(pageSize).ToDictionary(x => x.Key, x => x.Value);tasks.Add(Task.Run(() => Handle(dic2)));}//等待处理完await Task.WhenAll(tasks); msg2 += $"Task.WhenAll{(DateTime.Now-dt).TotalSeconds}秒=> ";}}catch (Exception ex){msg2 += $"ex:{ex.Message}=>";}finally{RedisHelper.Remove(CommCacheConfig.MQ_ELEC_KEY + "_Lock");msg2 += $"finally--end";LogHelper.Info("Elec", msg2);// 如果当前缓存还有 ,就继续处理if (isStill)await Save(null, isApi);}}///保存数据库的操作  并删除掉缓存的keyprivate async Task<int> Handle(Dictionary<string, MQData<MQData>> dic){var fields = new List<string>();using (DataFilter.Disable<IMultiTenant>()){using (var uow = UnitOfWorkManager.Begin(requiresNew: true, isTransactional: true)){var service = IocManager.Instance.GetService<XXXManager>();fields = await service.TaskAsync(dic);await uow.CompleteAsync();}}if (fields.Count > 0)RedisHelper.DelH(CommCacheConfig.MQ_ELEC_KEY, fields.ToArray());return fields.Count;}

Step3:加定时任务:防止不足100条的时候没有处理机制

 /// <summary>/// MQ会推到 消费者那里  然后缓存到redis   ,然后每1分钟把redis(不足100条的)的保存到数据库里/// </summary> [DisallowConcurrentExecution][TaskCustomAttribute("每次100条定时任务","xxx主题", "0/20 * * * * ?")]public class DeviceMqJob : IJob{readonly IHttpClientFactory httpClientFactory;public ElectMqJob(IHttpClientFactory httpClientFactory){this.httpClientFactory = httpClientFactory;}/// <returns></returns>public async Task Execute(IJobExecutionContext context){try{using (var clinet = new HttpClient()){//触发机制 这里走的是http就触发刚刚的逻辑var url = HostLocalUrl + "/api/Save"; var r = await httpClientFactory.HttpSendAsync(HttpMethod.Post, url);}}catch (Exception ex){LogHelper.Error(ex);}}}

http://www.lryc.cn/news/578316.html

相关文章:

  • 植物small RNA靶基因预测软件,psRobot
  • 网络的相关概念
  • Java ArrayList顺序表 + 接口实现 + 底层
  • jQuery UI 安装使用教程
  • Electron 进程间通信(IPC)深度优化指南
  • mysql 双主集群故障修复
  • TensorFlow源码深度阅读指南
  • 清理 Docker 缓存占用
  • 第 1 课:Flask 简介与环境配置(Markdown 教案)
  • 深度解析基于贝叶斯的垃圾邮件分类
  • 如何让宿主机完全看不到Wi-Fi?虚拟机独立联网隐匿上网实战!
  • 基础算法合集-图论
  • 我认知的AI宇宙系列第三期
  • 贪心算法在C++中的应用与实践
  • 论文中用matplotlib画的图,如何保持大小一致。
  • 「Java案例」计算矩形面积
  • 嵌入式原理与应用篇---常见基础知识(10)
  • 湖北理元理律师事务所债务解法:从法律技术到生活重建
  • 大根堆加小根堆查找中位数o(N)时间复杂度
  • 【Springai】项目实战进度和规划
  • DFMEA检查表模板下载
  • PHP安装使用教程
  • js代码02
  • 【C++】简单学——模板初阶
  • PyTorch 中 nn.Linear() 参数详解与实战解析(gpt)
  • 项目:数据库应用系统开发:智能电商管理系统
  • 认识 Spring AI
  • 【C++】简单学——STL简介(了解)
  • tauri v2 开源项目学习(一)
  • 安装bcolz包报错Cython.Compiler.Errors.CompileError: bcolz/carray_ext.pyx的解决方法