当前位置: 首页 > news >正文

Spring Boot 集成 RabbitMQ:普通队列、延迟队列与死信队列全解析

Spring Boot 集成 RabbitMQ:普通队列、延迟队列与死信队列全解析

  • 1. 背景介绍
  • 2. RabbitMQ 及队列类型详解
  • 3. 项目依赖配置(pom.xml)
  • 4. Spring Boot RabbitMQ 配置详解(application.yml)
  • 5. 核心队列代码示例及详解
  • 6. 消息生产者实现
  • 7. 消费者设计及异常处理策略
  • 8. 死信队列消费者与告警设计
  • 9. 消息确认机制详解
    • 常见异常示例
    • 异常原因分析
    • 解决方案
  • 10. 延迟队列实现原理与RabbitMQ插件说明
  • 11. 系统容错与性能优化建议
  • 12. 常见问题及排查方法
  • 13. 总结与最佳实践


1. 背景介绍

现代分布式系统中,异步消息队列作为解耦、削峰和异步处理的重要组件,被广泛采用。RabbitMQ 是一款基于 AMQP 协议的成熟消息队列中间件,功能丰富,性能稳定。

在这里插入图片描述

本篇文章通过一个典型的业务场景讲解如何在 Spring Boot 应用中集成 RabbitMQ,实现:

  • 普通队列:用于正常业务消息处理

  • 延迟队列:实现消息的延迟投递和重试机制

  • 死信队列:捕获处理失败的消息,方便后续监控、报警或补偿处理


2. RabbitMQ 及队列类型详解

队列类型作用使用场景
普通队列存放业务正常消息,消费者消费并处理订单处理、用户通知、日志收集等实时消息处理
延迟队列支持消息延迟一定时间后再消费,用于重试或定时任务消息失败自动重试,定时提醒,延时任务执行
死信队列存放消费失败或过期的消息,避免消息丢失消息处理异常、消息TTL到期、消息被拒绝等情况捕获

关键概念

  • 交换机 (Exchange):接收生产者发送的消息,根据类型和路由规则转发到相应队列

  • 队列 (Queue):消息的存储容器,消费者从队列获取消息进行消费

  • 绑定 (Binding):交换机和队列的关联关系,确定消息流向

  • 路由键 (Routing Key):用于交换机匹配队列的关键字

  • 死信 (Dead Letter):消息在队列中无法正常消费,被丢弃或重新路由的消息


3. 项目依赖配置(pom.xml)

   <!-- Spring Boot AMQP Starter,集成 RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

4. Spring Boot RabbitMQ 配置详解(application.yml)

spring:rabbitmq:host: localhost          # RabbitMQ服务器地址port: 5672               # 端口号username: guest          # 用户名password: guest          # 密码virtual-host: /          # 虚拟主机publisher-confirm-type: correlated  # 消息确认机制publisher-returns: true  # 开启消息返回# 自定义队列和交换机配置参数config:normal:queue: normal.task.queueexchange: normal.task.exchangerouting-key: normal.task.routingKeydelay:queue: delay.retry.queueexchange: delay.retry.exchangerouting-key: delay.retry.routingKeydead-letter:queue: dead.letter.queueexchange: dead.letter.exchangerouting-key: dead.letter.routingKeyttl: 604800000 # 死信队列消息存活时间,7天(单位:毫秒)

5. 核心队列代码示例及详解

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMQConfig {/*** 普通队列* 配置死信交换机和死信路由键,实现消息失败后自动进入死信队列*/@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal.task.queue").withArgument("x-dead-letter-exchange", "dead.letter.exchange").withArgument("x-dead-letter-routing-key", "dead.letter.routingKey").build();}/*** 普通直连交换机*/@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal.task.exchange");}/*** 普通队列绑定到普通交换机,路由键 normal.task.routingKey*/@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal.task.routingKey");}/*** 延迟交换机,基于 rabbitmq_delayed_message_exchange 插件* 通过 x-delayed-message 类型支持延迟投递消息*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");  // 指定交换机类型为 directreturn new CustomExchange("delay.retry.exchange", "x-delayed-message", true, false, args);}/*** 延迟队列,存放延迟消息*/@Beanpublic Queue delayQueue() {return QueueBuilder.durable("delay.retry.queue").build();}/*** 延迟队列绑定延迟交换机,路由键 delay.retry.routingKey*/@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.retry.routingKey").noargs();}/*** 死信队列,用于存放失败或过期消息* 配置消息 TTL,过期消息自动删除*/@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable("dead.letter.queue").withArgument("x-message-ttl", 604800000)  // 7天消息过期时间.build();}/*** 死信交换机*/@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dead.letter.exchange");}/*** 死信队列绑定死信交换机,路由键 dead.letter.routingKey*/@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter.routingKey");}
}

6. 消息生产者实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
@Slf4j
public class TaskProducer {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送普通任务消息* @param message 业务消息内容*/public void sendTask(String message) {rabbitTemplate.convertAndSend("normal.task.exchange", "normal.task.routingKey", message);log.info("[生产者] 发送普通消息: {}", message);}/*** 发送延迟任务消息,默认延迟30分钟* @param message 消息内容*/public void sendDelayedTask(String message) {sendDelayedTask(message, 30 * 60 * 1000L);}/*** 发送指定延迟时间的延迟任务消息* @param message 消息内容* @param delayMillis 延迟时间,单位毫秒*/public void sendDelayedTask(String message, long delayMillis) {MessageProperties props = new MessageProperties();props.setContentType(MessageProperties.CONTENT_TYPE_JSON);// 设置延迟时间(单位毫秒)props.setHeader("x-delay", delayMillis);Message amqpMessage = new Message(message.getBytes(StandardCharsets.UTF_8), props);rabbitTemplate.send("delay.retry.exchange", "delay.retry.routingKey", amqpMessage);log.info("[生产者] 发送延迟消息,延迟 {} 秒后投递: {}", delayMillis / 1000, message);}
}

7. 消费者设计及异常处理策略

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;@Component
@Slf4j
public class TaskConsumer {@Resourceprivate TaskProducer taskProducer;/*** 监听普通队列* 消费失败时将消息发送到延迟队列进行重试*/@RabbitListener(queues = "normal.task.queue", ackMode = "MANUAL")public void handleNormalQueue(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {log.info("[普通队列] 处理消息: {}", message);// TODO: 业务逻辑处理// 消息成功处理,确认消息channel.basicAck(tag, false);} catch (Exception e) {log.error("[普通队列] 消息处理失败,发送延迟队列重试: {}", message, e);try {// 发送延迟队列,延迟30分钟重试taskProducer.sendDelayedTask(message);// 确认消息,防止消息重复消费channel.basicAck(tag, false);} catch (Exception ex) {log.error("[普通队列] 延迟队列发送失败,消息进入死信队列: {}", message, ex);// 拒绝消息,消息进入死信队列channel.basicReject(tag, false);}}}/*** 监听延迟队列* 消费失败时消息进入死信队列,避免死循环*/@RabbitListener(queues = "delay.retry.queue", ackMode = "MANUAL")public void handleDelayedQueue(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {log.info("[延迟队列] 处理消息: {}", message);// TODO: 业务逻辑处理// 确认消息channel.basicAck(tag, false);} catch (Exception e) {log.error("[延迟队列] 处理失败,消息进入死信队列: {}", message, e);// 拒绝消息,进入死信队列channel.basicReject(tag, false);}}
}

8. 死信队列消费者与告警设计

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class DeadLetterConsumer {/*** 监听死信队列,处理无法消费或过期消息* 业务可实现报警、日志存储或人工干预*/@RabbitListener(queues = "dead.letter.queue", ackMode = "MANUAL")public void handleDeadLetter(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {log.warn("[死信队列] 接收到死信消息: {}", message);// TODO: 告警处理、持久化存储等// 确认消息,避免重复消费channel.basicAck(tag, false);} catch (Exception e) {log.error("[死信队列] 处理死信消息异常,丢弃消息: {}", message, e);channel.basicReject(tag, false);}}
}

9. 消息确认机制详解

  • 自动确认(AUTO):消息一旦被投递给消费者,RabbitMQ 直接认为消息已被消费成功,存在消息丢失风险。

  • 手动确认(MANUAL):消费者在消息成功处理后,显式调用 basicAck 确认,失败时调用 basicRejectbasicNack,保证消息不丢失。

  • 本文示例采用手动确认,结合死信机制,实现更高可靠性。

在上述示例中,我们采用了 手动确认 模式以保证消息的可靠消费。手动确认使得消费者在处理完成后,主动通知 RabbitMQ 消息已被正确消费,避免消息丢失或重复消费。

常见异常示例

当调用 channel.basicAck()channel.basicReject() 传入了错误的 delivery tag 时,可能出现如下异常:


Shutdown Signal: channel error; protocol method: #method\<channel.close>(
reply-code=406, reply-text=PRECONDITION\_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

异常原因分析

  • RabbitMQ 的 delivery tag 是每个 Channel 上递增的消息编号,用于标识该消息。
  • 如果调用确认时传入了不正确或已确认过的 delivery tag,就会出现“unknown delivery tag”的异常。
  • 多线程或异步场景中,共享 Channel 可能导致 delivery tag 不匹配。

解决方案

  1. 配置 Spring Boot 启用手动确认

    application.yml 中明确配置:

    spring:rabbitmq:listener:type: directdirect:acknowledge-mode: manual
    

    该配置保证了监听容器采用手动确认模式。

  2. @RabbitListener 注解中显式声明手动确认 ackMode = "MANUAL"

    @RabbitListener(queues = "your_queue_name", ackMode = "MANUAL")
    public void consumeMessage(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {// 业务处理channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicReject(deliveryTag, false);}
    }
    
  3. 确保每条消息只调用一次确认方法

    • 严格保证 basicAckbasicReject 只针对当前消息调用一次,避免重复确认。

    • 尽量避免多个线程共用同一 Channel。


10. 延迟队列实现原理与RabbitMQ插件说明

  • RabbitMQ 官方不支持原生延迟队列,但提供了rabbitmq_delayed_message_exchange插件。
  • 延迟消息通过消息头 x-delay 设置延迟毫秒数,消息在交换机中等待指定时间后投递到绑定的队列。
  • 安装插件命令(RabbitMQ 服务端执行):
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 重新启动 RabbitMQ 服务后生效。

11. 系统容错与性能优化建议

  • 消息幂等性:确保消费者处理幂等,避免因消息重试造成数据重复。

  • 消息过期:合理配置死信队列消息TTL,避免死信队列堆积。

  • 重试机制:延迟队列可灵活配置延迟时间,多次重试后进入死信队列。

  • 连接池配置:合理设置 RabbitMQ 连接池大小,避免资源浪费。

  • 监控告警:对死信队列消息量、积压情况、消费者消费速率做监控,及时发现异常。

  • 异常日志:完整记录异常日志,方便问题排查。

  • 消息大小限制:避免发送过大消息,影响吞吐性能。


12. 常见问题及排查方法

问题可能原因排查建议
消费者未收到消息交换机和队列绑定不正确,路由键错误检查绑定关系和路由键配置
延迟消息无效未启用 rabbitmq_delayed_message_exchange 插件服务器启用插件,并重启 RabbitMQ
消息未进入死信队列死信交换机或死信路由键配置错误确认死信队列配置参数和绑定是否正确
消费者抛异常导致消息重试业务代码异常未捕获优化业务逻辑,捕获异常,避免无限重试
队列积压严重消费者消费慢或宕机增加消费者实例,检查消费者性能,避免阻塞

13. 总结与最佳实践

  • 利用 普通队列 + 延迟队列 + 死信队列,构建灵活的消息处理流程,提升系统可靠性和可维护性。

  • 结合手动消息确认和异常捕获,实现消息不丢失、失败消息自动重试。

  • 延迟队列利用 RabbitMQ 插件实现定时任务和重试逻辑,避免复杂业务逻辑实现。

  • 死信队列作为异常消息的存放点,配合告警和人工介入,保障业务稳定。

  • 结合监控体系和合理配置,实现高可用、可扩展的消息系统。

http://www.lryc.cn/news/594661.html

相关文章:

  • 我的网页聊天室设计
  • Python100个库分享第38个—lxml(爬虫篇)
  • sky-take-out项目中Redis的使用
  • 【Linux】Prometheus 监控 Kafka 集群
  • 基于大数据的旅游推荐系统 Python+Django+Hive+Vue.js
  • 关于 URL 中 “+“ 号变成空格的问题
  • 机器学习对词法分析、句法分析、浅层语义分析的积极影响
  • 人工智能真的能编程吗?研究勾勒出自主软件工程的障碍
  • [Python] -项目实战10- 用 Python 自动化批量重命名文件
  • 识别并计算滑块距离
  • 远程登录服务器黑屏如何处理?
  • 日历类生辰八字九九三伏入梅出梅算法
  • 某日在某个月份中不存在导致软件出现异常的问题排查(判断闰年以及月份中的天数,附完整源码)
  • 编译支持cuda硬件加速的ffmpeg
  • cuda编程笔记(9)--使用 Shared Memory 实现 tiled GEMM
  • Linux进程核心机制:状态、优先级与上下文切换详解
  • 亚马逊自养号测评实战指南:从环境搭建到安全提排名
  • 微信小程序服务端快速对接指南(java版)
  • 添加状态信息
  • Docker实践:使用Docker部署blog轻量级博客系统
  • Python Matplotlib中的fontdict参数说明
  • 前后端分离项目进阶1---后端
  • 易语言+懒人精灵/按键中控群控教程(手机、主板机、模拟器通用)
  • 子网划分核心原理 (网络原理1)
  • Windows Server2022下使用SQL Server2019开发版搭建高可用集群
  • 如何用纯 HTML 文件实现 Vue.js 应用,并通过 CDN 引入 Element UI
  • 【js(3)】执行上下文/作用域链/垃圾回收与内存泄漏/闭包
  • Vue组件之间通信
  • C语言运算符优先级“潜规则”
  • 数据库的介绍和安装