.netcore+ef+redis+rabbitmq+dotcap先同步后异步再同步的方法,亲测有效
背景
场景:有一批设备数据上报到系统,通过rabbitmq进行做消费队列,每15分钟一报,每次报2w条,
如果一条条接收,每接收一条耗时0.5秒,那2w条就要2.7小时,这也太久了......那如何优化呢,要求搜索时长,且mq的数据不要重复消费
原理:
dotcap原理:这里略过 ,可以自行去了解。这里引出 就是抛弃mq的失败ack机制
IOT上报数据--->IOT平台--->RabbitMQ----->消费MQ----->Redis---->消费Redis的数据
↑
定时任务去触发(不足100条时)
采用多任务批处理机制:
1)每次从Redis哈希表中取出100条数据;
2)分页并行处理(每页20条);
3)处理成功后删除缓存。同时设置定时任务(每20秒)处理不足100条的残留数据,确保数据及时性。
方案通过Redis分布式锁防止重复消费,实现了处理效率从小时级到秒级的提升,且保证数据不丢失不重复。
步骤
step1:dotcap:接收mq的消息
[NonAction][CapSubscribe("xxx.device", Group = "xxx.device")]public void SubscribeMQAsync(object obj){var msg = "";var dt = DateTime.Now; try{//将接收到的mq消息直接塞到redis的哈希里RedisHelper.AddH(CommCacheConfig.MQ_ELEC_KEY, Guid.NewGuid().ToStr(), obj);}catch (Exception ex){msg += $"异常,参数:{str}=>{ex.ToStr()}耗时:" + (DateTime.Now - dt).TotalSeconds;throw ex;}}
step2:处理逻辑(核心逻辑)
public async Task Save(MQData<MQData> data = null, bool isApi = false){var isStill = false;var msg2=string.Empty;try{msg2 += "锁前=> ";var flag = RedisHelper.SetNX(CommCacheConfig.MQ_ELEC_KEY + "_Lock", DateTime.Now.ToStr(), 240);if (flag){msg2 += "锁中=> ";var dic = RedisCacheHelper.HGetAllAsync<MQData<MQData>>(CommCacheConfig.MQ_ELEC_KEY).GetAwaiter().GetResult();if (dic.Count() >= 100) dic = dic.Take(100).ToDictionary(x => x.Key, x => x.Value);isStill=dic.Count() > 0;//如果超过100条代表还有 这里打个表计表示继续if (data != null){if (dic == null) dic = new Dictionary<string, MQData<MQData>>();dic.Add(Guid.NewGuid().ToStr(), data);}if (dic.Count() == 0) return ResParameter.Fail("无数据");var tasks = new List<Task>();var pageSize = 20;var totalPage = dic.Count() / pageSize + dic.Count() % pageSize > 0 ? 1 : 0;if (isApi)msg2 += "isApi=> ";//这个改多任务处理var dt=DateTime.Now;var pages = new List<int>();for (var page = 1; page <= totalPage; page++){var dic2 = dic.Skip((page - 1) * pageSize).Take(pageSize).ToDictionary(x => x.Key, x => x.Value);tasks.Add(Task.Run(() => Handle(dic2)));}//等待处理完await Task.WhenAll(tasks); msg2 += $"Task.WhenAll{(DateTime.Now-dt).TotalSeconds}秒=> ";}}catch (Exception ex){msg2 += $"ex:{ex.Message}=>";}finally{RedisHelper.Remove(CommCacheConfig.MQ_ELEC_KEY + "_Lock");msg2 += $"finally--end";LogHelper.Info("Elec", msg2);// 如果当前缓存还有 ,就继续处理if (isStill)await Save(null, isApi);}}///保存数据库的操作 并删除掉缓存的keyprivate async Task<int> Handle(Dictionary<string, MQData<MQData>> dic){var fields = new List<string>();using (DataFilter.Disable<IMultiTenant>()){using (var uow = UnitOfWorkManager.Begin(requiresNew: true, isTransactional: true)){var service = IocManager.Instance.GetService<XXXManager>();fields = await service.TaskAsync(dic);await uow.CompleteAsync();}}if (fields.Count > 0)RedisHelper.DelH(CommCacheConfig.MQ_ELEC_KEY, fields.ToArray());return fields.Count;}
Step3:加定时任务:防止不足100条的时候没有处理机制
/// <summary>/// MQ会推到 消费者那里 然后缓存到redis ,然后每1分钟把redis(不足100条的)的保存到数据库里/// </summary> [DisallowConcurrentExecution][TaskCustomAttribute("每次100条定时任务","xxx主题", "0/20 * * * * ?")]public class DeviceMqJob : IJob{readonly IHttpClientFactory httpClientFactory;public ElectMqJob(IHttpClientFactory httpClientFactory){this.httpClientFactory = httpClientFactory;}/// <returns></returns>public async Task Execute(IJobExecutionContext context){try{using (var clinet = new HttpClient()){//触发机制 这里走的是http就触发刚刚的逻辑var url = HostLocalUrl + "/api/Save"; var r = await httpClientFactory.HttpSendAsync(HttpMethod.Post, url);}}catch (Exception ex){LogHelper.Error(ex);}}}