Java研学-RabbitMQ(八)
一 消费者可靠性
RabbitMQ 的消费者可靠性通过消费者确认机制、消费失败处理策略及业务幂等性设计形成完整闭环:
消费者处理完成后发送 ACK/NACK 显式确认,成功则删除消息,失败则触发重试(自动或手动模式支持),重试超限后消息进入死信队列或异常通道避免无限循环;
同时,业务层需通过唯一请求ID、分布式锁或状态机等实现幂等性,确保重复消费时数据状态一致,最终实现从消息投递到业务落地的全链路可靠性保障。
二 消费者确认机制
1 概述
RabbitMQ 的消费者确认机制通过显式回执保障消息可靠处理:消费者完成消息处理后,需向 RabbitMQ 发送 ack(确认删除消息)、nack(处理失败并允许重试)或 reject(处理失败且拒绝重试)
三种回执,
其中 nack 可触发消息重新入队或路由至死信队列(需配置)
,而reject 通常用于明确无效消息的丢弃
;结合 Spring AMQP 等框架的自动/手动确认模式,该机制实现了消息处理状态的精准追踪,为后续重试、幂等性等可靠性策略提供了基础支撑。
此时消费者需要在监听并处理消息的同时,给MQ发送回执。
2 application.yaml – consumer
acknowledge-mode:
新增的消费者配置
模式 | 确认/删除时机 | 消息删除条件 | 异常处理 |
---|---|---|---|
none | 投递后立即 ACK → 立即删除 | 无条件删除(无论业务是否成功) | 无处理(消息丢失无感知) |
manual | 手动调用: - ACK → 删除- NACK/REJECT → 不删除(可重试/进死信队列) | 仅 ACK 时删除 | 需手动实现成功/失败逻辑(灵活但侵入性强) |
auto | 自动触发: - 成功 → ACK - 异常 → NACK | 以下情况删除: 1. 成功执行 → ACK 后删除 2. 业务异常 → NACK + 不重试 → 进死信队列(若配置) 3. 系统异常 → 默认 NACK + 重试(失败后进死信队列) | 通过异常类型区分: - 业务异常:直接丢弃 - 系统异常:可配置重试或丢弃 |
spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1 # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: none # 自动确认模式(推荐生产使用)logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志
3 监听系统异常消息 – MqListener
@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new RuntimeException("抛出系统异常"); // 抛出系统异常(非业务异常)}
}
4 发送消息 – PublisherApplicationTests
@Slf4j
@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessageQueue() {String queueName = "huang.queue";String msg = "hello, Huang!";rabbitTemplate.convertAndSend(queueName, msg);System.out.println("Sent message: " + msg);}
}
5 为系统异常打断点运行
消息发送至队列
断点运行启动,消费者开始监听,此时消息还没有处理完
此刻会发现消息未被处理完毕,队列已经将信息删除,此时抛出异常,消息丢失
6 application.yaml – consumer
acknowledge-mode: 设置为auto
spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1 # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: auto # 自动确认模式(推荐生产使用)logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志
7 再为系统异常打断点运行
重复上述步骤
,发现消息状态发生变化Unacked尚未获得回执
此时断点放行,会返回Nack,队列重新进行投递消息,直到成功为止
若服务宕机(关闭consumer服务),消息会被收回,状态恢复,等待下次投递
8 监听业务异常 – MqListener
@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new MessageConversionException("抛出业务异常"); // 抛出业务异常}
}
9 业务异常结果
产生业务异常,消息会被直接拒绝,然后移除
三 消费失败重试机制
1 介绍
Spring AMQP 提供了本地重试机制(基于 Spring Retry),可以避免消息因消费者异常而无限次 requeue 到 RabbitMQ 队列,从而减少不必要的消息堆积和系统压力。
2 application.yaml – consumer
配置重试机制retry:
spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1 # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: auto # 自动确认模式(推荐生产使用)retry:enabled: true # 开启重试机制initial-interval: 1000ms # 初始重试间隔(1秒)multiplier: 1.0 # 下次重试间隔的倍数(1.0表示固定间隔)max-attempts: 3 # 最大重试次数(含初次消费)stateless: true # true(无状态,默认);false(有状态,适用于事务)logging:level:cn.tj.consumer: DEBUG # 设置为 DEBUG 以查看详细日志
3 PublisherApplicationTests
发送消息到队列
@Slf4j
@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessageQueue() {String queueName = "huang.queue";String msg = "hello, Huang!";rabbitTemplate.convertAndSend(queueName, msg);System.out.println("Sent message: " + msg);}
}
4 MqListener
启动消费者
@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new RuntimeException("抛出系统异常"); // 抛出系统异常(非业务异常)}
}
5 测试结果
此时消费者每收到一次消息,就抛一次异常,重试3次以后(次数耗尽),消息被丢弃