当前位置: 首页 > news >正文

RabbitMQ延迟消息——DelayExchange插件

什么是死信以及死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信

        1. 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

        2. 消息是一个过期消息,超时无人消费

        3. 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

为什么这里会介绍死信交换机呢,举个例子,我们在购买车票的时候会有一个支付时间,8分钟没有支付就会销毁订单,返回车票。mq不可能时刻监控客户有没有支付,可以使用延迟消息,延迟8分钟,八分钟后再去发送消息到mq,在查看支付情况。

DelayExchange插件

官网下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

[{"CreatedAt": "2024-06-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]

 插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

 

 具体使用

声明交换机,基于@Bean:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("sdgstu.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}

基于注解:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "stusdg.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}

发送消息:

@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}

http://www.lryc.cn/news/438059.html

相关文章:

  • 【系统规划与管理师】【案例分析】【考点】【答案篇】第5章 IT服务部署实施
  • 华为云服务器的数据库部署及管理
  • C#【必备技能篇】替换一个字节(byte)中连续几位(bit)的内容
  • roboguide将tp程序转化为LS文本格式的方法
  • 基于SpringBoot+Vue+MySQL的流浪猫狗宠物救助救援网站管理系统
  • I/O 多路复用:`select`、`poll`、`epoll` 和 `kqueue` 的区别与示例
  • 大数据之Flink(三)
  • 【HCIA-Datacom】IPv4地址介绍
  • maven父子工程多模块如何管理统一的版本号?
  • JavaScript --函数的作用域(全局和局部)
  • 贪吃蛇项目实现(C语言)——附源码
  • 【C++】42道面试经典问题总结
  • php 实现JWT
  • vue table id一样的列合并
  • xshell密钥方式连接阿里云Linux
  • Wni11 下 WSL 安装 CentOS
  • ROADM(可重构光分插复用器)-介绍
  • HarmonyOS开发之路由跳转
  • 怎么使用ai 免费生成ppt?这4个工具可以帮忙
  • Android主副屏显示-Android13
  • 什么是 SMB 服务器以及它如何工作?
  • 【python计算机视觉编程——10.OpenCV】
  • 医学数据分析实训 项目二 数据预处理预备知识(数据标准化处理,数据离差标准化处理,数据二值化处理,独热编码处理,数据PCA降维处理)
  • MySQL查询执行(四):查一行也很慢
  • 【Obsidian】当笔记接入AI,Copilot插件推荐
  • Spring Cloud集成Gateaway
  • 如何准备技术面试?
  • Kafka原理剖析之「Topic创建」
  • Java 高级学习路线概要~
  • 浏览器插件快速开启/关闭IDM接管下载