当前位置: 首页 > news >正文

Java研学-RabbitMQ(八)

一 消费者可靠性

  RabbitMQ 的消费者可靠性通过消费者确认机制、消费失败处理策略及业务幂等性设计形成完整闭环:
消费者处理完成后发送 ACK/NACK 显式确认,成功则删除消息,失败则触发重试(自动或手动模式支持),重试超限后消息进入死信队列或异常通道避免无限循环;
同时,业务层需通过唯一请求ID、分布式锁或状态机等实现幂等性,确保重复消费时数据状态一致,最终实现从消息投递到业务落地的全链路可靠性保障。

二 消费者确认机制

1 概述

  RabbitMQ 的消费者确认机制通过显式回执保障消息可靠处理:消费者完成消息处理后,需向 RabbitMQ 发送 ack(确认删除消息)、nack(处理失败并允许重试)或 reject(处理失败且拒绝重试)三种回执,
其中 nack 可触发消息重新入队或路由至死信队列(需配置),而reject 通常用于明确无效消息的丢弃;结合 Spring AMQP 等框架的自动/手动确认模式,该机制实现了消息处理状态的精准追踪,为后续重试、幂等性等可靠性策略提供了基础支撑。
此时消费者需要在监听并处理消息的同时,给MQ发送回执。

2 application.yaml – consumer

  acknowledge-mode: 新增的消费者配置

模式确认/删除时机消息删除条件异常处理
none投递后立即 ACK → 立即删除无条件删除(无论业务是否成功)无处理(消息丢失无感知)
manual手动调用:
- ACK → 删除
- NACK/REJECT → 不删除(可重试/进死信队列)
ACK 时删除需手动实现成功/失败逻辑(灵活但侵入性强)
auto自动触发:
- 成功 → ACK
- 异常 → NACK
以下情况删除
1. 成功执行 → ACK 后删除
2. 业务异常 → NACK + 不重试 → 进死信队列(若配置)
3. 系统异常 → 默认 NACK + 重试(失败后进死信队列)
通过异常类型区分:
- 业务异常:直接丢弃
- 系统异常:可配置重试或丢弃
spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1                     # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: none          # 自动确认模式(推荐生产使用)logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志

3 监听系统异常消息 – MqListener

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new RuntimeException("抛出系统异常"); // 抛出系统异常(非业务异常)}
}

4 发送消息 – PublisherApplicationTests

@Slf4j
@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessageQueue() {String queueName = "huang.queue";String msg = "hello, Huang!";rabbitTemplate.convertAndSend(queueName, msg);System.out.println("Sent message: " + msg);}
}

5 为系统异常打断点运行

  消息发送至队列
在这里插入图片描述
断点运行启动,消费者开始监听,此时消息还没有处理完
在这里插入图片描述
此刻会发现消息未被处理完毕,队列已经将信息删除,此时抛出异常,消息丢失
在这里插入图片描述

6 application.yaml – consumer

  acknowledge-mode: 设置为auto

spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1                     # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: auto          # 自动确认模式(推荐生产使用)logging:level:cn.tj.consumer.listeners: DEBUG # 设置为 DEBUG 以查看详细日志

7 再为系统异常打断点运行

  重复上述步骤,发现消息状态发生变化Unacked尚未获得回执
在这里插入图片描述
此时断点放行,会返回Nack,队列重新进行投递消息,直到成功为止
在这里插入图片描述
若服务宕机(关闭consumer服务),消息会被收回,状态恢复,等待下次投递
在这里插入图片描述

8 监听业务异常 – MqListener

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new MessageConversionException("抛出业务异常"); // 抛出业务异常}
}

9 业务异常结果

  产生业务异常,消息会被直接拒绝,然后移除
在这里插入图片描述
在这里插入图片描述

三 消费失败重试机制

1 介绍

  Spring AMQP 提供了本地重试机制(基于 Spring Retry),可以避免消息因消费者异常而无限次 requeue 到 RabbitMQ 队列,从而减少不必要的消息堆积和系统压力。

2 application.yaml – consumer

  配置重试机制retry:

spring:rabbitmq:host: 192.168.44.128port: 5672virtual-host: /midhuangusername: dahuangpassword: "dahuang66"# 消费者配置listener:simple:prefetch: 1                     # 预取消息数量(平衡吞吐量与公平性)acknowledge-mode: auto          # 自动确认模式(推荐生产使用)retry:enabled: true                # 开启重试机制initial-interval: 1000ms     # 初始重试间隔(1秒)multiplier: 1.0              # 下次重试间隔的倍数(1.0表示固定间隔)max-attempts: 3              # 最大重试次数(含初次消费)stateless: true              # true(无状态,默认);false(有状态,适用于事务)logging:level:cn.tj.consumer: DEBUG # 设置为 DEBUG 以查看详细日志

3 PublisherApplicationTests

  发送消息到队列

@Slf4j
@SpringBootTest
class PublisherApplicationTests {// 注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessageQueue() {String queueName = "huang.queue";String msg = "hello, Huang!";rabbitTemplate.convertAndSend(queueName, msg);System.out.println("Sent message: " + msg);}
}

4 MqListener

  启动消费者

@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "huang.queue")public void listenHuangQueue(String msg) {System.out.println("消费者收到消息:【" + msg + "】");throw new RuntimeException("抛出系统异常"); // 抛出系统异常(非业务异常)}
}

5 测试结果

  此时消费者每收到一次消息,就抛一次异常,重试3次以后(次数耗尽),消息被丢弃
在这里插入图片描述
在这里插入图片描述

四 失败消息处理策略

1

五 业务幂等性

http://www.lryc.cn/news/620591.html

相关文章:

  • 李沐-第六章-LeNet训练中的pycharm jupyter-notebook Animator类的显示问题
  • 【LeetCode 热题 100】295. 数据流的中位数——最大堆和最小堆
  • 基于Django的福建省旅游数据分析与可视化系统【城市可换】
  • AI 编程实践:用 Trae 快速开发 HTML 贪吃蛇游戏
  • 【经验分享】如何在Vscode的Jupyter Notebook中设置默认显示行号
  • vscode的wsl环境,ESP32驱动0.96寸oled屏幕
  • 【面板数据】各省及市省级非物质文化遗产数据合集(2005-2024年)
  • 【JavaEE】多线程 -- 初识线程
  • Java应用快速部署Tomcat指南
  • **超融合架构中的发散创新:探索现代编程语言的挑战与机遇**一、引言随着数字化时代的快速发展,超融合架构已成为IT领域的一种重要趋势
  • ts概念讲解
  • 网络原理-HTTP
  • 一致性哈希Consistent Hashing
  • 【代码随想录day 20】 力扣 669. 修剪二叉搜索树
  • 力扣-64.最小路径和
  • 玩转Docker | 使用Docker部署JSON格式化工具ZJSON
  • iOS Sqlite3
  • 磁盘瓶颈现形记 - iostat让I/O压力无所遁形
  • 「iOS」————设计架构
  • iOS 26 一键登录失效:三大运营商 SDK 无法正常获取手机号
  • iOS性能监控新方法多版本对比与趋势分析实战指南
  • iOS混淆工具有哪些?游戏 App 防护下的混淆与加固全攻略
  • 网络通信---Axios
  • iOS App TestFlight 上架全流程案例,从 0 到 1 完成内测分发
  • Docker 部署:Web SSH、RDP、VNC 多协议全能远程管理工具
  • 零基础数据结构与算法——第七章:算法实践与工程应用-搜索引擎
  • 洗浴中心泡池水过滤系统原理深度解析与工程实践
  • 数智先锋 | 告别运维黑盒!豪鹏科技×Bonree ONE构建全栈智能可观测体系
  • 【网络】TCP/UDP总结复盘
  • Ollama如何分别使用2张H100GPU和4张A100部署GPT-OSS-120B全指南:硬件配置与负载均衡实战