当前位置: 首页 > news >正文

RabbitMQ延迟消息的实现

RabbitMQ延迟队列的实现

  • 延迟消息是什么
    • 延迟消息的实现
    • 死信交换机
      • 代码实现
    • 延迟消息插件


延迟消息是什么

延迟消息是将消息发送到MQ中,消费者不会立即收到消息,而是过一段时间之后才会收到消息,进行处理。在一些业务中,可以用到延迟消息,比如我们在成功下单一个商品后,需要立即付款,为了避免商品库存一直被占有,我们会给商品设置一个支付时间,如果在这段时间没有支付成功,就会恢复库存,删除订单,对于订单支付的超时删除我们是通过延迟消息来实现的,让消费者在支付超时之后查询用户是否支付,如果支付成功直接返回,如果支付失败就恢复库存删除订单。

延迟消息的实现

延迟消息由以下两种方式实现,第一种是通过绑定死信交换机实现,第二种通过延迟消息插件实现,推荐使用第二种,更加简单

死信交换机

满足以下三种情况之一的叫做死信:
1、在设置了过期时间的消息,放入队列中,超过了过期时间没有被处理的消息
2、消息消费失败(返回nack或者reject)并且不能重复入队
3、队列消息堆积满了,最早的消息叫做死信
我们可以给队列绑定参数指定交换机,那么死信会被投递到指定交换机。
消息队列实现原理:我们可以设置一组没有消费者的交换机和队列,设置另一组处理绑定死信的交换机、队列和消费者,来处理延迟消息。

代码实现

定义死信交换机等和消费延迟消息交换机等:


@Configuration
public class DelayConfiguration {/*** 定义死信交换机、队列以及绑定*/@Beanpublic DirectExchange exchange() {return new DirectExchange("dead.direct");}@Beanpublic Queue queue() {Queue queue = new Queue("dead.queue");queue.addArgument("x-dead-letter-exchange", "delay.direct");return queue;}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with("dead");}/*** 定义处理延迟消息的交换机、队列和绑定*/@Beanpublic DirectExchange exchange1() {return new DirectExchange("delay.direct");}@Beanpublic Queue queue1() {return new Queue("delay.queue");}@Beanpublic Binding binding1() {return BindingBuilder.bind(queue1()).to(exchange1()).with("dead");}
}

定义延迟消息监听器:

    @RabbitListener(queues = "delay.queue")public void listen(String msg){log.info(LocalDateTime.now()+": "+msg);}

测试:

@Testvoid sendDeadMsg() {rabbitTemplate.convertAndSend("dead.direct", "dead", "我是死信", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {
//                设置过期消息时间message.getMessageProperties().setExpiration("10000");return message;}});}

结果:
在这里插入图片描述
在这里插入图片描述
消费者在十秒钟后成功消费延迟消息

延迟消息插件

我们在之前通过死信交换机来实现延迟队列,但是死信交换机是专门用来存放无法处理的消息,并且使用死信交换机实现过于复杂,我们需要手动定义两个交换机和队列,因而RabbitMQ提供了延迟消息插件来让我们更简单的实现延迟消息。
原理:给消息设置延迟时间,当将消息放入MQ时,MQ的交换机不会立即将消息放入队列,而是会在交换机中暂存延迟时间过后将消息路由到队列中,可以让队列处理延迟消息。
安装插件:插件安装可以借鉴这篇博客
代码实现
消费者:

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "delay.queue",durable = "true"),// 开启延迟交换机exchange = @Exchange(name = "delay.direct",delayed = "true"),key = "dead"))public void listen(String msg){log.info(msg);}
  @Testvoid sendDeadMsg() {rabbitTemplate.convertAndSend("delay.direct", "delay", "我是死信", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {
//                设置延迟消息时间message.getMessageProperties().setDelay(10000);return message;}});}
http://www.lryc.cn/news/498697.html

相关文章:

  • SAP在中国:助力企业跨越成长的新篇章
  • 数据结构代码归纳
  • 数仓技术hive与oracle对比(一)
  • 筑起厂区安全--叉车安全防护装置全解析
  • 深入浅出云计算 ---笔记
  • ARINC 标准全解析:航空电子领域多系列标准的核心内容、应用与重要意义
  • SNMP 协议介绍
  • Python中的数据结构深入解析:从列表到字典的优化技巧
  • 如何利用Java爬虫获得商品类目
  • 力扣面试题 32 - 检查平衡性 C语言解法
  • 【机器学习】机器学习的基本分类-监督学习-决策树-ID3 算法
  • Implicit style-content separation using lora
  • ROS[aruco_ros+easy_handeye]手眼标定(眼在手外+UR10e+realsense-d435i)
  • 第九篇:k8s 通过helm发布应用
  • dataTable
  • json+Tomact项目报错怎么办?
  • Flume——sink连接Hive的参数配置(属性参数)
  • Netty面试内容整理-Netty 的应用场景
  • 波特图方法
  • 服务器数据恢复—硬盘掉线导致热备盘同步失败的RAID5阵列数据恢复案例
  • 在Ubuntu中运行和管理AppImage
  • 如何查看电脑的屏幕刷新率?
  • 浏览器数据存储方法深度剖析:LocalStorage、IndexedDB、Cookies、OPFS 与 WASM - SQLite
  • 面向金融场景的大模型 RAG 检索增强解决方案
  • 经典蓝牙(BT/EDR)蓝牙配对与连接
  • Flask: flask框架是如何实现非阻塞并发的
  • JAVA |日常开发中连接Oracle数据库详解
  • 头歌 进程管理之二(wait、exec、system的使用)
  • 详解日志格式配置:XML 与 Spring Boot 配置文件格式
  • JDK21新特性