当前位置: 首页 > news >正文

基于 RabbitMQ 死信队列+TTL 实现延迟消息+延迟插件基本使用

在许多业务场景中,我们都需要处理延迟任务。例如:

  • 用户下单后,30分钟内未支付则自动取消订单。
  • 用户注册成功后,5分钟后发送一份引导邮件。
  • 创建一个定时任务,在未来某个指定时间点执行。
    这些场景的核心需求都是:在未来的某个时间点,触发一个特定的动作。RabbitMQ 本身没有直接提供延迟队列的功能,但我们可以巧妙地利用其两个核心特性——**消息存活时间(TTL)死信交换机(Dead-Letter-Exchange)**来组合实现一个强大而可靠的延迟消息系统。

本文将通过一个完整的 Spring Boot + RabbitMQ 示例,深入剖析如何使用 DLX + TTL 实现延迟消息,分析其工作原理、优缺点,并介绍通过插件来解决的更成熟的解决方案。

核心概念

在深入代码之前,我们必须先理解两个关键概念。

1. 消息存活时间 (Time-To-Live, TTL)

TTL 用于设置消息在队列中的最大存活时间,单位为毫秒。当一条消息在一个队列中的存留时间超过了其 TTL 值,它就会“过期”。

TTL 有两种设置方式:

  • 对整个队列设置 TTL:通过在 queue.declare 时添加 x-message-ttl 参数。该队列中的所有消息都将拥有相同的存活时间。
  • 对单条消息设置 TTL:在发送消息时,通过设置消息属性(expiration)来指定。这样可以为每条消息赋予不同的存活时间。

当一条消息过期后,它会变成“死信”(Dead Letter)。

2. 死信交换机 (Dead-Letter-Exchange, DLX)

死信交换机本质上也是一个普通的交换机。当一个队列中的消息满足以下任一条件时,它就会变成“死信”,并被 RabbitMQ 自动重新发布到该队列预先配置好的一个“死信交换机”上。

  1. 消息 TTL 过期(我们本次实现的核心)。
  2. 消息被消费者拒绝(basic.rejectbasic.nack),并且 requeue 参数被设置为 false
  3. 队列达到最大长度(x-max-length)或最大容量(x-max-length-bytes),导致最早的消息被丢弃。

实现原理:DLX + TTL 组合拳

我们的延迟队列实现思路正是利用了上述两个特性:

  1. 创建一个普通的业务队列(我们称之为 normal.queue),不设置任何消费者。
  2. 为这个 normal.queue 配置一个死信交换机(dead.letter.exchange)。
  3. 当生产者发送一条消息时,我们为其设置一个 TTL(例如10秒),并将其发送到与 normal.queue 绑定的业务交换机(normal.exchange)。
  4. 由于 normal.queue 没有消费者,消息会在队列中静静地等待。
  5. 10秒后,消息的 TTL 到期,它变成了“死信”。
  6. RabbitMQ 自动将这条死信消息从 normal.queue 中移除,并将其路由到预设的 dead.letter.exchange
  7. dead.letter.exchange 再根据其路由规则,将消息投递到最终的“死信队列”(dead.letter.queue)。
  8. 我们的消费者只监听这个死信队列。一旦收到消息,就意味着延迟时间已到,可以开始处理业务。

配置Exhange/Queue

import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Configuration  
public class DelayedConfig {  // 1. 声明普通的业务交换机和队列  public static final String NORMAL_EXCHANGE = "normal.exchange";  public static final String NORMAL_QUEUE = "normal.queue";  public static final String NORMAL_ROUTING_KEY = "normal.key";  // 2. 声明死信交换机和死信队列  public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";  public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";  public static final String DEAD_LETTER_ROUTING_KEY = "dead.key";  @Bean  public DirectExchange normalExchange() {  return new DirectExchange(NORMAL_EXCHANGE);  }  // 这就是出错的队列声明  @Bean  public Queue normalQueue() {  return QueueBuilder.durable(NORMAL_QUEUE)  .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)  .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)  .build();  }  @Bean  public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {  return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);  }  // 3. 确保死信交换机和死信队列也作为 Bean 被声明  @Bean  public DirectExchange deadLetterExchange() {  return new DirectExchange(DEAD_LETTER_EXCHANGE);  }  @Bean  public Queue deadLetterQueue() {  return new Queue(DEAD_LETTER_QUEUE);  }  @Bean  public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {  // 死信队列绑定到死信交换机,使用普通队列指定的 dead-letter-routing-keyreturn BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY);  }  
}

配置解读

  • normalQueue() 是配置的核心。我们通过 .withArgument() 方法为 normal.queue 设置了两个重要参数:
    • x-dead-letter-exchange:指定了当队列里的消息变成死信后,应该被发往哪个交换机。
    • x-dead-letter-routing-key:指定了死信消息被发送到死信交换机时,使用哪个路由键。这允许我们更灵活地控制死信消息的流向。

生产者

发送带TTL的消息
生产者将消息发送到业务交换机 normal.exchange,并为每条消息动态设置 expiration 属性。

@RestController  
@RequestMapping  
public class DelayController {  private final RabbitTemplate rabbitTemplate;  public DelayController(RabbitTemplate rabbitTemplate) {  this.rabbitTemplate = rabbitTemplate;  }  @RequestMapping("/delay")  public String delay(){  //发送带ttl的消息  System.out.println("发送延迟消息, 当前时间: " + new Date());rabbitTemplate.convertAndSend("normal.exchange", "normal.key",  "delay test with ttl 10s..."+new Date(),message -> {  message.getMessageProperties().setExpiration("10000");  return message;  });  rabbitTemplate.convertAndSend("normal.exchange", "normal.key",  "delay test with ttl 20s..."+new Date(), message -> {  message.getMessageProperties().setExpiration("20000");  return message;  });  return "success";  }  
}

消费者

监听死信队列
消费者只关心最终的业务处理,所以它监听的是 dead.letter.queue

import com.rabbitmq.client.Channel;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  import java.util.Date;  
@Component  
public class DelayConsumer {  @RabbitListener(queues = "dead.letter.queue")  public void ListenerDLXQueue(Message message, Channel channel) throws Exception {  long deliveryTag = message.getMessageProperties().getDeliveryTag();  System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", new Date(), new String(message.getBody(),"UTF-8"),  deliveryTag);  }  
}

结果分析与问题洞察

当我们访问 http://127.0.0.1:8080/delay 后,观察控制台输出如下:

发送延迟消息, 当前时间: 周日 8月 10 17:28:13 CST 2025周日 8月 10 17:28:23 CST 2025 死信队列接收到消息: delay test with ttl 10s...Sun Aug 10 17:28:11 CST 2025, deliveryTag: 1
周日 8月 10 17:28:34 CST 2025 死信队列接收到消息: delay test with ttl 20s...Sun Aug 10 17:28:13 CST 2025, deliveryTag: 2

但是如果将投递顺序调换

@RequestMapping("/delay")  
public String delay(){  //发送带ttl的消息  System.out.println("发送延迟消息, 当前时间: " + new Date());  rabbitTemplate.convertAndSend("normal.exchange", "normal.key",  "delay test with ttl 20s..."+new Date(),message -> {  message.getMessageProperties().setExpiration("20000");  return message;  });  rabbitTemplate.convertAndSend("normal.exchange", "normal.key",  "delay test with ttl 10s..."+new Date(), message -> {  message.getMessageProperties().setExpiration("10000");  return message;  });  return "success";  
}

再次进行请求,你会看到延迟20s的消息和延迟10s的消息是同时被处理的

发送延迟消息, 当前时间: Sun Aug 10 22:27:15 CST 2025
周日 8月 10 22:27:36 CST 2025 死信队列接收到消息: delay test with ttl 20s...Sun Aug 10 22:27:15 CST 2025, deliveryTag: 1
周日 8月 10 22:27:36 CST 2025 死信队列接收到消息: delay test with ttl 10s...Sun Aug 10 22:27:15 CST 2025, deliveryTag: 2

这是为什么呢?
这是 DLX+TTL 方案最核心的一个“陷阱”:RabbitMQ 只会检查队列头部的消息是否过期。如果队头的消息没有过期,那么后面的消息就算已经过期了,也无法被投递到死信交换机。

在我们的例子中:

  1. 20s TTL 的消息先入队,位于队头。
  2. 10s TTL 的消息后入队,位于其后。
  3. RabbitMQ 盯着队头的 20s TTL 消息。20秒后,该消息过期,被投递到死信队列。
  4. 此时,10s TTL 的消息才成为新的队头。RabbitMQ 开始检查它,发现它的 TTL 已经结束了,直接进行投递
    这就导致了延迟的“串行”执行,延迟时间有可能会被延后。

改进方案:使用延迟消息插件

为了解决上述的队列头部阻塞问题,并实现更精确、更灵活的延迟控制,RabbitMQ 官方提供了一个非常强大的插件:rabbitmq-delayed-message-exchange

插件原理

该插件提供了一种新的交换机类型:x-delayed-message。这种交换机在接收到消息后,并不会立即投递到队列,而是会根据消息头中的 x-delay 属性(单位毫秒)来等待相应的时间,然后再进行投递。这个过程是在交换机内部完成的,不依赖于队列,因此不会产生队头阻塞问题

插件代码实践

假设您已经在 RabbitMQ 服务器上启用了 rabbitmq-delayed-message-exchange 插件。

1. 新增插件配置

配置变得异常简单,不再需要死信队列和业务队列的区分。

// 新增一个配置类用于演示插件用法
@Configuration
public class DelayedPluginConfig {public static final String DELAYED_EXCHANGE = "delayed.plugin.exchange";public static final String DELAYED_QUEUE = "delayed.plugin.queue";public static final String DELAYED_ROUTING_KEY = "delayed.plugin.key";@Beanpublic CustomExchange delayedExchange() {// 声明一个 x-delayed-message 类型的交换机// durable: 持久化// autoDelete: 不自动删除return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false,Map.of("x-delayed-type", "direct")); // 指定基础交换机类型}@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE);}@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
2. 新增插件生产者

发送消息时,我们不再设置 expiration,而是添加一个 x-delay 的 header。

// 在 DelayController 中新增一个方法
@RestController
public class DelayController {// ... 原有代码 ...@RequestMapping("/delay-plugin")public String delayPlugin() {System.out.println("发送延迟消息 (Plugin), 当前时间: " + new Date());// 发送延迟20秒的消息rabbitTemplate.convertAndSend(DelayedPluginConfig.DELAYED_EXCHANGE, DelayedPluginConfig.DELAYED_ROUTING_KEY,"delay test with plugin 20s..." + new Date(), message -> {message.getMessageProperties().setHeader("x-delay", 20000); // 20秒return message;});// 发送延迟10秒的消息rabbitTemplate.convertAndSend(DelayedPluginConfig.DELAYED_EXCHANGE, DelayedPluginConfig.DELAYED_ROUTING_KEY,"delay test with plugin 10s..." + new Date(), message -> {message.getMessageProperties().setHeader("x-delay", 10000); // 10秒return message;});return "success (plugin)";}
}
3. 新增插件消费者

消费者直接监听最终的业务队列即可。

// 新增一个消费者类
@Component
public class DelayedPluginConsumer {@RabbitListener(queues = DelayedPluginConfig.DELAYED_QUEUE)public void listenDelayedQueue(Message message) throws Exception {System.out.printf("插件延迟队列 %tc 接收到消息: %s%n", new Date(), new String(message.getBody(),"UTF-8"));}
}

现在访问 http://127.0.0.1:8080/delay-plugin,你会发现,即使是后发送的10秒延迟消息,也会比先发送的20秒延迟消息先被消费,完美解决了队头阻塞问题。

结论与选型建议

本文我们详细探讨了两种实现 RabbitMQ 延迟消息的方法:

特性DLX + TTL 方案延迟消息插件方案
实现方式依赖队列TTL和死信交换机依赖特定类型的交换机 (x-delayed-message)
配置复杂度较高,需要配置两套Exchange和Queue较低,一套Exchange和Queue即可
延迟精确性受队头消息影响,不精确精确,消息间延迟互不影响
依赖RabbitMQ原生功能,无需额外插件需要在服务端安装并启用 rabbitmq-delayed-message-exchange 插件
适用场景业务场景简单,队列中消息TTL固定,或能容忍延迟误差对延迟时间精确性要求高的场景

总结建议

  • 首选延迟消息插件:对于绝大部分需要延迟消息的场景,延迟插件提供了更简单、更精确、更符合直觉的解决方案。它是目前实现延迟消息的最佳实践。
  • 了解 DLX+TTL:虽然插件方案更优,但理解 DLX+TTL 的工作原理非常有价值。它不仅能让你实现延迟队列,更能加深你对 RabbitMQ 核心机制的理解,这对于问题排查和设计更复杂的系统大有裨益。
http://www.lryc.cn/news/615752.html

相关文章:

  • 十、Linux Shell脚本:流程控制语句
  • [Julia] LinearAlgebra.jl 自带包
  • LeetCode 刷题【37. 解数独】
  • LabVIEW 机器人避障控制
  • 企业架构之导论(1)
  • C++设计模式单例模式(饿汉、懒汉模式)
  • Linux操作系统从入门到实战(十六)冯诺依曼体系结构,操作系统与系统调用和库函数概念
  • 【软件测试】BUG篇 — 详解
  • AI测试助手如何让Bug无处可藏
  • uni-app 网络请求终极选型:uni.request、axios、uni-network、alova 谁才是你的真命请求库?
  • Eclipse JSP/Servlet:深入解析与最佳实践
  • 繁花深处:花店建设的时代意义与多元应用—仙盟创梦IDE
  • 计算机视觉全景指南:从OpenCV预处理到YOLOv8实战,解锁多模态AI时代(第五章)
  • 【Docker进阶实战】从多容器编排到集群部署
  • [Linux]学习笔记系列 -- [arm][lib]
  • 13. 是否可以在static环境中访问非static变量
  • 如何在 Ubuntu 24.04 LTS Linux 上安装 MySQL 服务器
  • opencv颜色识别项目:识别水果
  • jmeter常规压测【读取csv文件】
  • Ubuntu 22.04 离线环境下完整安装 Anaconda、CUDA 12.1、NVIDIA 驱动及 cuDNN 8.9.3 教程
  • AI绘画:生成唐初秦叔宝全身像提示词
  • 安全运维工具链全解析
  • ELK分布式日志采集系统
  • 【系统分析师】软件需求工程——第11章学习笔记(上)
  • 旅行者1号无线电工作频段
  • 《解锁 C++ 起源与核心:命名空间用法 + 版本演进全知道》
  • 计算机网络:求地址块128.14.35.7/20中的相关信息
  • 《从零构建大语言模型》学习笔记4,注意力机制1
  • Redis如何实现一个分布式锁?
  • Redis主从复制和哨兵模式