当前位置: 首页 > news >正文

RabbitMQ--Springboot解决消息丢失

🐰 Spring Boot 实现 RabbitMQ 消息可靠性机制

🔧 环境前置配置(application.yml)

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated   # 开启 confirm 模式(推荐)publisher-returns: true              # 开启 return 回调(路由失败)listener:simple:acknowledge-mode: manual         # 手动 ACK

✅ 1. 持久化机制(队列 + 消息)

📌 功能:消息和队列都设置为持久化

🔧 配置类:Queue + Exchange + Binding

@Configuration
public class RabbitConfig {@Beanpublic Queue persistentQueue() {return QueueBuilder.durable("persistent_queue").build();}
}

✅ 生产者

@Service
public class PersistentProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send() {MessageProperties props = new MessageProperties();props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化Message message = new Message("Hello Persistent".getBytes(), props);rabbitTemplate.send("", "persistent_queue", message);System.out.println("✅ 消息已发送(持久化)");}
}

✅ 消费者

@Component
public class PersistentConsumer {@RabbitListener(queues = "persistent_queue")public void receive(String msg) {System.out.println("✅ 消费者收到:" + msg);}
}

✅ 2. 生产者确认机制(Confirm)

🔧 Confirm 回调配置

@Configuration
public class ConfirmCallbackConfig {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// Confirm 回调(消息是否到达交换机)rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("✅ 交换机已收到消息");} else {System.err.println("❌ 消息未送达交换机:" + cause);}});// Return 回调(交换机→队列失败)rabbitTemplate.setReturnsCallback(returned -> {System.err.println("❌ 路由失败:" + new String(returned.getMessage().getBody()));});}
}

✅ 异步发送(Spring Boot 默认是异步 confirm)

@Service
public class ConfirmProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send() {CorrelationData data = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key", "Hello Confirm", data);System.out.println("消息发送成功,等待 confirm 回调...");}
}

✅ 3. 消费者手动确认(ACK)

✅ 配置类

@Configuration
public class AckConfig {@Beanpublic Queue ackQueue() {return QueueBuilder.durable("ack_queue").build();}
}

✅ 生产者

@Service
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send() {rabbitTemplate.convertAndSend("", "ack_queue", "Manual ACK 消息");}
}

✅ 消费者(手动 ACK)

@Component
public class AckConsumer {@RabbitListener(queues = "ack_queue", ackMode = "MANUAL")public void receive(Message message, Channel channel) throws Exception {try {String body = new String(message.getBody());System.out.println("处理消息:" + body);// 业务处理成功后手动 ACKchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 异常处理,拒绝并重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

✅ 4. 死信队列(DLX)

🔧 队列配置

@Configuration
public class DLXConfig {@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal_queue").withArgument("x-dead-letter-exchange", "dlx-exchange") // DLX绑定.build();}@Beanpublic FanoutExchange dlxExchange() {return new FanoutExchange("dlx-exchange");}@Beanpublic Queue dlxQueue() {return QueueBuilder.durable("dlx_queue").build();}@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange());}
}

✅ 生产者

@Service
public class DLXProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(String msg) {rabbitTemplate.convertAndSend("", "normal_queue", msg);}
}

✅ 消费者

@Component
public class DLXConsumer {@RabbitListener(queues = "normal_queue", ackMode = "MANUAL")public void consume(Message message, Channel channel) throws IOException {String body = new String(message.getBody());if (body.contains("error")) {// 拒绝并不重回队列 → 进入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}@RabbitListener(queues = "dlx_queue")public void handleDLX(String msg) {System.err.println("🧟 死信处理:" + msg);}
}

✅ 5. 事务模式(不推荐但支持)

@Service
public class TxProducer {@Autowiredprivate ConnectionFactory connectionFactory;public void sendTx() throws Exception {try (Connection connection = connectionFactory.createConnection();Channel channel = connection.createChannel(false)) {channel.txSelect(); // 开启事务try {channel.queueDeclare("tx_queue", true, false, false, null);channel.basicPublish("", "tx_queue", null, "事务消息".getBytes());channel.txCommit();System.out.println("✅ 事务提交成功");} catch (Exception e) {channel.txRollback();System.out.println("❌ 事务回滚:" + e.getMessage());}}}
}

✅ 对比总结

机制Spring Boot 配置可靠性性能推荐
持久化durable + deliveryMode✅ 高中等✅ 推荐
Confirmpublisher-confirm-type✅ 高✅ 推荐
手动 ACKacknowledge-mode=manual✅ 高中等✅ 推荐
死信队列x-dead-letter-exchange✅ 高✅ 推荐
事务模式txSelect + txCommit✅ 非常高❌ 低🚫 不推荐

最后:在生产者的确认机制(Confirm)分两种

1. RabbitTemplate + ConfirmCallback(这种就是上面写的)(推荐)

  • 作用范围
    RabbitTemplate 是 Spring Boot 封装的模板工具类,ConfirmCallback 监听的是通过这个 RabbitTemplate 发送的所有消息
    一旦配置了 publisher-confirm-type: correlated该模板发送到任意交换机的消息都会触发回调。

  • 使用特点

    • 全局生效,适合大部分场景。

    • 不需要手动调用 waitForConfirms(),由 Spring 异步回调通知发送结果。

    • 需要在 application.yml 中开启 publisher-confirm-type,否则不生效。

    • 如果使用多个 RabbitTemplate,需要分别配置 ConfirmCallback。

  • 适用场景

    • 大部分 Spring Boot 项目中使用。

    • 需要简单统一的确认机制,而不是针对单一消息进行控制。

    • 适合与消息持久化、重试机制结合,统一日志记录发送结果。


2. Channel + waitForConfirms(使用的java原生的 ⚠️ 不推荐)

  • 作用范围
    这是 RabbitMQ 原生的 Channel 级别确认。它只对当前 Channel 发送的消息有效,并且可以精确控制每一条消息的确认状态。

  • 使用特点

    • 局部控制,只影响当前 Channel。

    • 不需要 Spring 配置,直接用 channel.confirmSelect() 开启。

    • 可以 单条确认waitForConfirms())或 批量确认waitForConfirmsOrDie())。

    • 可以与异步确认(addConfirmListener)结合,实现细粒度的消息追踪。

  • 适用场景

    • 需要对某一批消息单独控制确认(而不是整个系统都一个回调)。

    • 在一个服务中需要对不同交换机、不同消息类型采取不同的确认策略。

    • 对性能有较高要求,需要批量确认或者异步确认机制。

2.1 代码样例

@Component
public class ConfirmSyncBatchProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate CachingConnectionFactory connectionFactory;public void sendBatchSyncConfirm() {//Spring Boot 不支持原生 channel.waitForConfirms() 的方式//但我们可用 ChannelCallback 手动实现rabbitTemplate.execute(channel -> {channel.confirmSelect(); // 启用 confirm 模式try {for (int i = 0; i < 5; i++) {String msg = "Sync Batch " + i;channel.basicPublish("","confirm-queue",MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}if (!channel.waitForConfirms()) {System.out.println("❌ 批量同步确认失败!");} else {System.out.println("✅ 批量同步确认成功!");}} catch (Exception e) {e.printStackTrace();}return null;});}
}

http://www.lryc.cn/news/600392.html

相关文章:

  • JavaWeb01——基础标签及样式(黑马视频笔记)
  • Android WorkManager 详解:高效管理后台任务
  • InstructBLIP:通过指令微调迈向通用视觉-语言模型
  • Android Data Binding 深度解析与实践指南
  • 像素、视野、光源,都有哪些因素影响测量精度?
  • 数据中心-时序数据库InfluxDB
  • 【影刀RPA_初级课程_我的第一个机器人】
  • jxORM--查询数据
  • 前端模块化开发实战指南
  • 【机器学习深度学习】模型私有化部署与微调训练:赋能特定问题处理能力
  • Oracle 11g RAC数据库实例重启的两种方式
  • JavaScript:现代Web开发的核心动力
  • 基于深度学习的胸部 X 光图像肺炎分类系统(六)
  • 技术赋能与营销创新:开源链动2+1模式AI智能名片S2B2C商城小程序的流量转化路径研究
  • SpringBoot连接Sftp服务器实现文件上传/下载(亲测可用)
  • Linux选择题
  • 《从零开始学 JSSIP:JavaScript 实时通信开发实战》
  • Jmeter的元件使用介绍:(五)定时器详解
  • Baumer工业相机堡盟工业相机如何通过YoloV8深度学习模型实现轮船检测识别(C#代码UI界面版)
  • PostGIS面试题及详细答案120道之 (011-020 )
  • 零基础学习性能测试第三章:jmeter构建性能业务场景
  • 论文阅读-RaftStereo
  • 【硬件-笔试面试题】硬件/电子工程师,笔试面试题-27,(知识点:信号完整性,信号反射,串扰,时延,抖动,衰减)
  • Qt 延时处理方法介绍
  • day 36打卡
  • 去中心化时代的通信革命:briefing与cpolar技术融合带来的安全范式革新
  • 明辨 JS 中 prototype 与 __proto__
  • 秋招Day19 - 分布式 - 限流
  • C++11 右值引用 Lambda 表达式
  • 基于深度学习的食管癌右喉返神经旁淋巴结预测系统研究