当前位置: 首页 > news >正文

【RocketMQ】消费失败重试与死信消息

🎯 导读:本文档详细介绍了RocketMQ中的重试机制与死信消息处理方法。对于生产者而言,文档提供了如何配置重试次数的具体示例;而对于消费者,它解释了默认情况下消息消费失败后的重试策略,并展示了如何通过代码自定义重试次数。当消息经过多次重试仍无法成功消费时,RocketMQ会将其标记为死信消息,并存入特定的死信队列中。文档还提供了处理死信队列的两种策略:一种是编写专门的消费者来处理这些消息,另一种是在达到一定重试次数后签收消息并通知人工干预。此外,还包括了关于死信消息生产和消费的基本示例代码。

文章目录

  • RocketMQ 重试机制
    • 生产者重试
    • 消费者重试
  • RocketMQ 死信消息
    • 消息生产者
    • 消息消费者
    • 死信消费者
    • 控制台显示

RocketMQ 重试机制

生产者重试

// 失败的情况重发3次(同步)
producer.setRetryTimesWhenSendFailed(3);
// 失败的情况重发3次(异步)
producer.setRetryTimesWhenSendAsyncFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

【示例代码】

@Test
public void retryProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();// 如果发送失败要重试几次(同步),不设置默认值是2producer.setRetryTimesWhenSendFailed(3);// 如果发送失败要重试几次(异步)
//        producer.setRetryTimesWhenSendAsyncFailed(3);String key = UUID.randomUUID().toString();System.out.println(key);Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());producer.send(message);System.out.println("发送成功");producer.shutdown();
}

消费者重试

如果消息消费失败,默认会重试16次,重试的时间间隔:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

能否自定义重试次数?

可以,重试的次数一般设置为5

// 消费失败,重试几次
consumer.setMaxReconsumeTimes(5);

如果重试了16次(并发模式是16次,顺序模式下重试次数是 int 类型最大值) 都是失败的,怎么处理?

认为该消息是死信消息,将消息放在一个死信主题中去,名称:%DLQ%消费者组名,最后再实现一个消费者去消费死信消息,一般是发邮件发短信通知人工处理、做一些记录

在这里插入图片描述

死信队列只有一个队列

在这里插入图片描述

当消息处理失败的时候 该如何正确的处理?

方案一:处理死信队列,如果每个死信队列都写一个消费者,很麻烦

/*** 方案一* 死信队列消费者* @throws Exception*/
@Test
public void retryDeadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new Date());System.out.println(new String(messageExt.getBody()));System.out.println("记录到特别的位置 文件 mysql 通知人工处理");// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

方案二:在实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理

/*** 方案二* 重试次数较多,直接做日志记录、通知人工处理* @throws Exception*/
@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new Date());try {// 业务处理,模拟报错handleDb();} catch (Exception e) {// 重试int reconsumeTimes = messageExt.getReconsumeTimes();if (reconsumeTimes >= 3) {// 重试次数太大,不要重试了System.out.println("记录到特别的位置 文件 mysql 通知人工处理");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
}

RocketMQ 死信消息

当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列

  • 当一条消息初次消费失败, RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。
  • 如果产生了死信消息,对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。
    • 可以利用 RocketMQ Admin 工具或者 RocketMQ Dashboard 上查询到对应死信消息的信息。
    • 也可以监听死信队列,进行自己的业务上的逻辑,写日志、通知人工处理

消息生产者

@Test
public void testDeadMsgProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("dead-group");producer.setNamesrvAddr("localhost:9876");producer.start();Message message = new Message("dead-topic", "我是一个死信消息".getBytes());producer.send(message);producer.shutdown();
}

消息消费者

@Test
public void testDeadMsgConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("dead-topic", "*");// 设置最大消费重试次数 2 次consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(msgs);// 测试消费失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();System.in.read();
}

死信消费者

注意权限问题

在这里插入图片描述

@Test
public void testDeadMq() throws  Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");consumer.setNamesrvAddr("localhost:9876");// 消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列// 队列名称 默认是 %DLQ% + 消费者组名consumer.subscribe("%DLQ%dead-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(msgs);// 处理消息 签收了return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

控制台显示

在这里插入图片描述

在这里插入图片描述

http://www.lryc.cn/news/449595.html

相关文章:

  • 注册安全分析报告:闪送
  • SpringCloud入门
  • js替换css主题变量并切换iconfont文件
  • UI设计师面试整理-设计趋势和行业理解
  • Java零工市场小程序如何改变自由职业者生活
  • android11 自动授权访问sdcard
  • 优青博导团队/免费指导/数据分析//论文润色/组学技术服务 、表观组分析、互作组分析、遗传转化实验、生物医学
  • Mybatis 学习之 分页实现
  • Spring Boot文件上传
  • 基于Springboot+Vue的高校体育运动会比赛系统(含源码+数据库)
  • 【JavaEE】——内存可见性问题
  • YOLO训练参数设置解析
  • 基于OpenCV的实时年龄与性别识别(支持CPU和GPU)
  • 理解Js执行上下文
  • 微信小程序 蓝牙通讯
  • java后端项目技术记录
  • PostgreSQL数据库与PostGIS在Windows中的部署与运行
  • 高级算法设计与分析 学习笔记10 平摊分析
  • 从“纸面算力”到“好用算力”,超聚变打通AI+“最后一公里”
  • 【有啥问啥】具身智能(Embodied AI):人工智能的新前沿
  • 11-pg内核之锁管理器(六)死锁检测
  • Git 与标签管理
  • 【0334】Postgres内核之 auxiliary process(辅助进程)初始化 MyPgXact
  • 20.1 分析pull模型在k8s中的应用,对比push模型
  • Ubuntu 镜像替换为阿里云镜像:简化你的下载体验
  • The Sandbox 游戏制作教程第 6 章|如何使用装备制作出色的游戏 —— 避免环境危险
  • JavaScript中的输出方式
  • 力扣9.25
  • 从零开始之AI面试小程序
  • Html2OpenXml:HTML转化为OpenXml的.Net库,轻松实现Html转为Word。