当前位置: 首页 > news >正文

基于Rabbitmq和Redis的延迟消息实现

1 基于Rabbitmq延迟消息实现

支付时间设置为30,未支付的消息会积压在mq中,给mq带来巨大压力。我们可以利用Rabbitmq的延迟队列插件实现消息前一分钟尽快处理
在这里插入图片描述
在这里插入图片描述

1.1定义延迟消息实体

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体

@Data
public class MultiDelayMessage<T> {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否还有下一个延迟时间*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

1.2 定义常量,用于记录交换机、队列、RoutingKey等常量

package com.hmall.trade.constants;public interface MqConstants {String DELAY_EXCHANGE = "trade.delay.topic";String DELAY_ORDER_QUEUE = "trade.order.delay.queue";String DELAY_ORDER_ROUTING_KEY = "order.query";
}

1.3 抽取mq配置到nacos中

spring:rabbitmq:host: ${hm.mq.host:192.168.150.101} # 主机名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机username: ${hm.mq.un:hmall} # 用户名password: ${hm.mq.pw:123} # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

1.4 定义消息处理器

使用延迟消息处理器发送消息
在这里插入图片描述
在这里插入图片描述

1.5 消息监听与延迟消息再次发送

在这里插入图片描述
在这里插入图片描述

2 延迟消息实现

DelayQueue:基于JVM,保存在内存中,会出现消息丢失
在这里插入图片描述

Rabbitmq的延迟任务:基于TTL和死信交换机
在这里插入图片描述

2.1 redis的延迟任务:基于zset的去重和排序功能

在这里插入图片描述
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑

2.为什么使用redis中的两种数据类型,list和zset?

  • 原因一: list存储立即执行的任务,zset存储未来的数据
  • 原因二:任务量过大以后,zset的性能会下降

时间复杂度:执行时间(次数) 随着数据规模增长的变化趋势

  • 操作redis中的list命令LPUSH: 时间复杂度: O(1)
  • 操作redis中的zset命令zadd: 时间复杂度: (Mlog(n))

2.2 设计mybatis映射实体类:

/*** 版本号,用乐观锁*/@Versionprivate Integer version;乐观锁支持:
/*** mybatis-plus乐观锁支持* @return*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}

2.3 创建task类,用于接收添加任务的参数

@Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}

2.4 添加任务

2.4.1 添加任务到数据库中

addTaskToDb(task);修改任务表和日志表

@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中** @param task* @return*/private boolean addTaskToDb(Task task) {boolean flag = false;try {//保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置taskIDtask.setTaskId(taskinfo.getTaskId());//保存任务日志数据TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}
2.4.2 添加任务到redis

addTaskToCache(task);判断任务执行之间是否在现在还是未来五分钟内

@Autowiredprivate CacheService cacheService;/*** 把任务添加到redis中** @param task*/private void addTaskToCache(Task task) {String key = task.getTaskType() + "_" + task.getPriority();//获取5分钟之后的时间  毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();//2.1 如果任务的执行时间小于等于当前时间,存入listif (task.getExecuteTime() <= System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));} else if (task.getExecuteTime() <= nextScheduleTime) {//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());}}

2.5 删除任务

1、删除数据库任务表,更改日志表任务状态
2、删除list或者zset中的任务

在TaskService中添加方法

/*** 取消任务* @param taskId        任务id* @return              取消结果*/
public boolean cancelTask(long taskId);
/*** 取消任务* @param taskId* @return*/
@Override
public boolean cancelTask(long taskId) {boolean flag = false;//删除任务,更新日志Task task = updateDb(taskId,ScheduleConstants.EXECUTED);//删除redis的数据if(task != null){removeTaskFromCache(task);flag = true;}return false;
}/*** 删除redis中的任务数据* @param task*/
private void removeTaskFromCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));}else {cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));}
}/*** 删除任务,更新任务日志状态* @param taskId* @param status* @return*/
private Task updateDb(long taskId, int status) {Task task = null;try {//删除任务taskinfoMapper.deleteById(taskId);TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());}catch (Exception e){log.error("task cancel exception taskid={}",taskId);}return task;}

2.6 消费任务

1、删除list中的数据
2、使用updateDB删除任务表、跟新日志表

在TaskService中添加方法

/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/
public Task poll(int type,int priority);

实现

/*** 按照类型和优先级拉取任务* @return*/
@Override
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;
}

2.7 未来定时任务更新-reids管道

减少与redis的交互次数
1、在引导类中添加开启任务调度注解:@EnableScheduling
2、在service中添加定时任务 @Scheduled(cron = “0 */1 * * * ?”),每分钟一次

@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}
}
public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;String[] strings = values.toArray(new String[values.size()]);stringRedisConnection.rPush(topic_key,strings);stringRedisConnection.zRem(future_key,strings);return null;}});return objects;}

总结

1、使用rebbitmq使用的场景是在支付和订单微服务中,用于实现消息可以延迟30分钟付款的功能。并借用该中间件的插件实现支付的异步下单功能,并可以快速处理前几分钟,防止消息堆积
2、使用redis是基于zset的去重和排序功能,相当于将一定数据的保存在数据库,使用定时任务同步数据库符合五分钟的任务到zset中,然后,在在zest中定时更新可以运行的任务到list集合中,相当于实现了延迟功能和缓存功能。
3、第二种还可以扩展为将rabbitmq中等待时间较长的数据存到redis中,然后定时的去同步redis中的数据到数据库中,防止消息堆积。

http://www.lryc.cn/news/230560.html

相关文章:

  • Masked Relation Learning for DeepFake Detection
  • R语言爬虫程序自动爬取图片并下载
  • 2023年10月国产数据库大事记-墨天轮
  • Linux内核分析(十四)--内存管理之malloc、free 实现原理
  • Hive函数
  • 教资笔记(目录)
  • np.repeat()的注意事项
  • 239. 滑动窗口最大值
  • c++ barrier 使用详解
  • c# 接口
  • 1、NPC 三电平SVPWM simulink仿真
  • JAVA对象列表强转失败,更好的方法
  • 2023最新版本 从零基础入门C++与QT(学习笔记) -5- 动态内存分配(new)
  • asp.net校园招聘管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio
  • flink的键值分区状态自动过期ttl配置
  • 解决Mac配置maven环境后,关闭终端后环境失效的问题(适用于所有终端关闭后环境失效的问题)
  • springboot运行jar包,实现复制jar包resources下文件、文件夹(可支持包含子文件夹)到指定的目录
  • Webpack Bundle Analyzer包分析器
  • SQL-----STUDENT
  • OpenCV入门——图像视频的加载与展示一些API
  • Control的Invoke和BeginInvoke
  • 什么是OpenCL?
  • AdaBoost:提升机器学习的力量
  • Pikachu(皮卡丘靶场)初识XSS(常见标签事件及payload总结)
  • 一则DNS被重定向导致无法获取MySQL连接处理
  • Vue3中如何使用this
  • 7.jvm对象内存布局
  • U-boot(一):Uboot命令和tftp
  • 代码随想录算法训练营第五十三天丨 动态规划part14
  • pdf增强插件 Enfocus PitStop Pro 2022 mac中文版功能介绍