当前位置: 首页 > news >正文

【RocketMQ】重试机制及死信消息处理

【RocketMQ】重试机制及死信消息处理

文章目录

  • 【RocketMQ】重试机制及死信消息处理
    • 1. 重试机制
      • 1.1 生产者重试
      • 1.2 消费者重试
        • 1.2.1 死信队列

参考文档: 官方文档

1. 重试机制

1.1 生产者重试

rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异步发送为0次)。

当然也可以自定义重试次数及机制:

// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

1.2 消费者重试

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。

  1. 在集群模式下,消费的业务逻辑代码会返回 Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试 16 次,之后该消息会被丢弃。
  2. 在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(5);

在实际生产中,一般重试3-5次,如果还没有消费成功,则可以把消息签收,通知人工介入。

  • 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);

示例:

消费者重试示例:

@Test
public void retryConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");//设置最大重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());//业务报错了 返回null 返回RECONSUME_LATER 都会重试if(true){throw new RuntimeException();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下:

消费类型重试间隔最大重试次数
顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX
并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。(根据延时等级划分,共16次)

1.2.1 死信队列

上面提到的 特殊Topic 称为死信Topic,对应的队列就是死信队列,其中存储的消息就是死信消息。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。应该通知人工介入处理。

对于死信的处理方案有多种,这里演示两种:

  1. 编写消费者监听死信队列
  2. 在消费者达到最大重试次数时立刻处理。

方案一

@Test
public void deadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");//业务报错了 返回null 返回RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

该方案有一个缺点,就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。

方案二:

针对方案一的缺点,方案二能够比较好的解决。

@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());try {handleDb();// 10/0} catch (Exception e) {if (messageExt.getReconsumeTimes() >= 2) {//不要重试了System.out.println("消息体:" + new String(messageExt.getBody()));System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");} else {//重试System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
}
http://www.lryc.cn/news/91237.html

相关文章:

  • Mysql DDL执行方式-pt-osc介绍 | 京东云技术团队
  • C++ stack容器介绍
  • 在 Git 中撤消更改的 6 种方法!
  • LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置
  • 编程(38)----------计算机的部分原理
  • 若依框架快速搭建(二)
  • 为建筑物的供暖系统实施MPC控制器的小型项目(Matlab代码实现)
  • 【概率论】中心极限定理(二)
  • Blender UV展开流程
  • Flutter 笔记 | Flutter 核心原理(二)关键类和启动流程
  • Android:主题切换
  • terminalworks ASP.NET Core PDF 浏览器-Crack
  • Rust每日一练(Leetday0020) 最后单词的长度、螺旋矩阵II、排列序列
  • 短视频矩阵源码如何做应用编程?
  • 【运维知识进阶篇】Ansible实现一套完整LNMP架构
  • Spring Boot 自动配置一篇概览
  • 深入理解设计原则之接口隔离原则(ISP)【软件架构设计】
  • IMX6ULL裸机篇之I2C实验主控代码说明二
  • 【计算机组成原理与体系结构】数据的表示与运算
  • 如何入门编程
  • SQL中CONVERT转化日期函数的使用方法
  • SpringBoot2-核心技术(一)
  • mac host学习
  • Java之~指定String日期时间,5分钟一截取时间
  • 【chatGPT4结对编程】chatGPT4教我做图像分类
  • Different romantic
  • learn C++ NO.7——C/C++内存管理
  • SDUT数据库原理——第十章作业(参考答案)
  • My Note of Diffusion Models
  • 【P37】JMeter 仅一次控制器(Once Only Controller)