当前位置: 首页 > news >正文

RabbitMQ消费者确认和重复机制

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

        ack:成功处理消息,RabbitMQ从队列中删除该消息

        nack:消息处理失败,RabbitMQ需要再次投递消息

        reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

   none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

   manual手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

         auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

如果是业务异常,会自动返回nack

如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring: rabbitmq:listener: simple: acknowledge-mode: none # 不做处理

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring 消费者接收到消息:【" + msg + "】");if (true) {throw new MessageConversionException("故意的");}log.info("消息处理完成");
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

重复机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力,为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次,本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject。

http://www.lryc.cn/news/442919.html

相关文章:

  • 【Verilog学习日常】—牛客网刷题—Verilog企业真题—VL77
  • thinkPHP 8.0.4 安装
  • 在k8s中,客户端访问服务的链路流程,ingress--->service--->deployment--->pod--->container
  • 寄存器二分频电路
  • Kafka3.8.0+Centos7.9的安装参考
  • Redis——持久化策略
  • 并查集LRU cache
  • SpringCloud的学习(三),Resilience4j
  • 【计算机网络篇】计算机网络概述
  • UDS诊断-面试题2
  • ovirt error: Network not found: no network with matching name ‘vdsm-ovirtmgmt‘
  • 2024百度的组织架构和产品分布
  • Java中List、ArrayList与顺序表
  • 缓存技巧 · Spring Cache Caffeine 高性能缓存库
  • RabbitMq中交换机(Exchange)、队列(Queue)和路由键(Routing Key)
  • 解码 OpenAI 的 o1 系列大型语言模型
  • 大小端字节序 和 内存高低地址顺序
  • Spring扩展点系列-MergedBeanDefinitionPostProcessor
  • Centos 7.9 使用 crontab 实现开机启动
  • 基于微信的设备故障报修管理系统设计与实现+ssm论文源码调试讲解
  • yolo自动化项目实例解析(二)ui页面整理 1.78
  • PyQt / PySide + Pywin32 + ctypes 自定义标题栏窗口 + 完全还原 Windows 原生窗口边框特效项目
  • 面试时遇见的项目问题
  • 在线骑行网站设计与实现
  • 大批量查询方案简记(Mybatis流式查询)
  • python - 子类为什么调用父类的方法
  • 【JavaScript】数据结构之字典 哈希表
  • Adobe出现This unlicensed Photoshop app has been disabled
  • elementui 单元格添加样式的两种方法
  • 如何有效管理技术债务:IT项目中的长期隐患