当前位置: 首页 > news >正文

RabbitMq 消息可靠性问题(一) --- publisher发送时丢失

前言

消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢?

  • 发送时丢失:
    • 生产者发送的消息未送达 exchange
    • 消息到达 exchange 后未到达 queue
  • MQ 宕机,queue将消息丢失
  • consumer 接收到消息后未消费就宕机
    在这里插入图片描述
    消息可靠性问题及其对应的解决方案:
场景publisher发送时丢失MQ消息丢失consumer消费问题
解决方案生产者确认机制消息持久化消费者消息确认&&失败重试机制

下面我们先说一下publisher 发送时丢失的问题应该如何处理

生产者确认机制的理论说明

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后, 会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publish-confirm, 发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publish-return, 发送回执
    • 消息投递到交换机,但是没有路由到队列,返回ACK, 及路由失败原因

注意: 确认机制发送消息时, 需要给每个消息设置一个全局唯一 id, 以区分不同消息,避免ack 冲突

在这里插入图片描述

代码实现

下面基于SpringAMQP 实现的生产者确认机制

  1. 在 publisher 服务的 application,yml 中添加以下配置:
spring:rabbitmq:publisher-confirm-type: correlated # 开启异步回调publisher-returns: truetemplate:mandatory: true

配置说明:

  • publish-confirm-type: 开启 publisher-confirm, 这里支持两种类型:
    • simple: 同步等待 confirm 结果, 直到超时
    • correlated: 异步回调, 定义ConfirmCallback, MQ 返回结果时会回调这个ConfirmCallback
  • publish-returns: 开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallbcak
  • template.mandatory: 定义消息路由失败时的策略。true, 则调用ReturnCallback, false: 则直接丢弃消息

ConfirmCallBack是基于每条消息设置的,所以需要一个全局唯一id 进行区分。
ReturenCallbcak 则是基于每个RabbitTemplate操作实例,是一种全局性的回调。

  1. 由于每个 RabbitTemplate 只能配置一个 ReturnCallback, 因此需要在项目启动过程中配置:
    (这里可以实现ApplicationContextAware,它可以在SpringIOC 容器初始化的时候,进行一些全局性回调的操作)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取 RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置 ReturnCallbackrabbitTemplate.setReturnCallback((message, replayCode, replayText,exchange, routingKey) -> {// 记录日志log.error("消息发送到队列失败, 响应码:{}, 失败原因:{},交换机:{}, 路由key:{},消息:{},",replayCode, replayText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}}
}	
  1. 为每条发送的消息,指定消息 ID, 并编写对应的 ConfirmCallback
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1. 准备消息String message = "hello, spring amqp!";// 2. 准备CorrelationData// 2.1 消息idCorrelationData correlationData = newCorrelationData(UUID.randomUUID().toString());// 2.2 准备 ConfirmCallbackcorrelationData.getFuture().addCallback(confirm -> {// 判断结果if(confirm.isAck()){// ACKlog.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());}else {// NACKlog.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());// 重发消息}}, throwable -> {// 记录日志log.error("消息发送失败", throwable);// 重发消息});// 3.发送消息rabbitTemplate.convertAndSend("amq.topic", "asimple.test", message, correlationData);
}

总结

SpringAMQP 中处理消息确认的几种情况:

  • publisher-confirm:
    • 消息发送到 exchange, 返回 ack
    • 消息发送失败,没有到达交换机,返回 nack
    • 消息发送过程中出现异常,没有收到回执
  • 消息成功发送到 exchange, 但没有路由到 queue, 调用 ReturnCallback
http://www.lryc.cn/news/58888.html

相关文章:

  • Java初识泛型
  • 寸照换底色技巧大全,超详细图文教程
  • 这篇文章价值很大:股票历史分时成交数据怎么简单获取?【干货】
  • muduo源码剖析--Buffer
  • AI人工智能简介和其定义
  • python数据清洗
  • Python3 os.makedirs() 方法、Python3 os.read() 方法
  • 【Linux安装数据库】Ubuntu安装mysql并连接navicat
  • GaussDB工作级开发者认证—第一章GaussDB数据库介绍
  • 阿里张勇:所有行业都值得用大模型重新做一遍!
  • ES6(字符串的扩展与新增方法)
  • rk3568点亮LCD(lvds)
  • 全终端办公电子邮件集成方案
  • 再不转型为ChatGPT程序员,有遭受降维打击的危险
  • maven使用教程
  • Emlog底部显示当前在线人数
  • 【java踩坑搞起】MybatisPlus封装的mapper不支持 join,那咋办
  • 【创造者】——什么是数学
  • ROS系列——错误syntax error near unexpected token `$‘do\r‘‘
  • 当星辰天合 SDS 遇见 Elastic
  • 使用vue实现分页
  • 白银实时行情操作中的一些错误及其解决办法(下)
  • Linux系统之tomcat的安装方法
  • 段式回文。
  • 易点易动设备管理系统高效管理海量备品备件
  • CMMI 3.0 究竟包含了哪些实践域?
  • 算法训练Day31: 455.分发饼干 376. 摆动序列 53. 最大子序和
  • ASP.NET(AJAX+JSON)实现对象调用
  • 一次弄懂gzip模块启用和配置指令
  • 猿辅导学员入选国家队,竞赛老师成为“最强辅助”