当前位置: 首页 > news >正文

【RabbitMQ】06-消费者的可靠性

在这里插入图片描述
在这里插入图片描述

1. 消费者确认机制

没有ack,mq就会一直保留消息。

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack

2. 失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3. 失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
在这里插入图片描述
代码:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfiguration {
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(name = "error.queue"),
//            exchange = @Exchange(name = "error.direct", type = ExchangeTypes.DIRECT),
//            key = {"error"}
//    ))
//    public void bings(Object msg){
//        System.out.println("异常"+msg.toString());
//    }@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
http://www.lryc.cn/news/481763.html

相关文章:

  • 【K8S系列】如何监控集群CPU使用率并设置告警的分析与详细解决方案
  • 解线性方程组(二)
  • HarmonyOS Next 实战卡片开发 02
  • FastDDS服务发现之PDP的收发
  • 【计网不挂科】计算机网络期末考试——【选择题&填空题&判断题&简述题】试卷(2)
  • 关于有机聚合物铝电容的使用(2)
  • Linux -- 进程初印象
  • 【超级简单】Facebook脸书视频下载一键保存手机
  • 昇思大模型平台打卡体验活动:项目2基于MindSpore通过GPT实现情感分类
  • 【JAVA】会员等级互通匹配数据库表设计
  • 论文阅读:基于语义分割的非结构化田间道路场景识别
  • linux部分问题以及解决方式
  • qt QTreeWidget详解
  • 注意力机制的目的:理解语义;编码器嵌入高纬空间计算;注意力得分“得到S*V”;解码器掩码和交叉注意力层用于训练;最终的编码器和输出实现大模型
  • [java][jdk]JDK各个版本的核心特性
  • 双十一”买买买!法官告诉你注意这些法律问题
  • PyQt5
  • 【Linux】常用命令(2.6万字汇总)
  • Vue3-06_路由
  • 物理验证Calibre LVS | SMIC Process过LVS时VNW和VPW要如何做处理?
  • 量化分析工具日常操作日记-5-通合科技
  • windows和linux验证MD5码方式
  • 构造函数原型对象语法、原型链、原型对象
  • 鸿蒙UI开发——自定义UI绘制帧率
  • 鸿蒙基本组件结构
  • 柔性鞋材振动刀智能视觉裁切机市场报告:未来几年年复合增长率CAGR为5.4%
  • 【计算机网络】基础知识,常识应用知识
  • 【Linux进程篇1】认识冯·诺依曼体系结构(引出进程详解)
  • 使用iviewui组件库的坑
  • 高级sql使用技巧