当前位置: 首页 > news >正文

Rocketmq消费消息时不丢失不重复

消息消费不丢失

手动ACK

在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS

spring boot中 消费消息确认

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",topic = "${carInInfo.topic}", selectorExpression = "*",consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {/*** 请不要捕获异常信息,否则无法进行消息重新推送** @param addCarInParkDTO*/@Overridepublic void onMessage(AddCarInParkDTO addCarInParkDTO) {System.out.println("收到消息:" + JSON.toJSONString(addCarInParkDTO));}

指定consumeMode = ConsumeMode.ORDERLY,实现消息确认,我们看下源码:

DefaultRocketMQListenerContainer.java这个类,看下其中一个类实现

public class DefaultMessageListenerOrderly implements MessageListenerOrderly {@SuppressWarnings("unchecked")@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}

只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了

 private void initRocketMQPushConsumer() throws MQClientException {......switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);}}

消息重试

对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。

如何让消息进行重试

RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置最大重试次数consumer.setMaxReconsumeTimes(5);}
}

一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。

每次重试的间隔时间如下:

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。

死信队列的特征:

  • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
  • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

消息幂等

在MQ系统中,消息幂等有三种实现语义:

at most once 最多一次:每条消息最多只会被消费一次

at least once 至少一次:每条消息至少会被消费一次

exactly once 刚刚好一次:每条消息都只会确定的消费一次

这三种语义都有他适用的业务场景。

at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。

at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。

而这个exactly once是MQ中最理想也是最难保证的一种语义。RocketMQ只能保证at least once,保证不了exactly once。

RocketMQ 消息重复的场景

发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

投递时消息重复消息

消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

负载均衡时消息重复

包括但不限于网络抖动、Broker 重启以及订阅方应用重启,当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

消息幂等解决方案

在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。

ocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。

但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

比如我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。

insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现消息的幂等,我们可能会采取这样的方案:

select * from t_order where order_no = 'order123'
if(order  != null) {return ;//消息重复,直接返回
}

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

还可以通过以下方式处理:

  • 使用数据库的行锁处理
  • 利用分布式锁处理不同服务间的并发。
  • 数据库对唯一值的入库字段设唯一索引。
http://www.lryc.cn/news/236949.html

相关文章:

  • RedisInsight——redis的桌面UI工具使用实践
  • JVM对象创建与内存分配
  • 央国企数字化转型难在哪?为什么要数字化转型?
  • 第7天:信息打点-资产泄漏amp;CMS识别amp;Git监控amp;SVNamp;DS_Storeamp;备份
  • 不可思议,红警居然开源了!
  • yolo系列模型训练数据集全流程制作方法(附数据增强代码)
  • 4、FFmpeg命令行操作7
  • 算法进阶——链表中环的入口节点
  • 无线WiFi安全渗透与攻防(N.1)WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速
  • YOLOV8部署Android Studio安卓平台NCNN
  • 【算法萌新闯力扣】:旋转字符串
  • 可逆矩阵的性质
  • HIT 模式识别 手写汉字分类 Python实现
  • GPT-4V-Act :一个多模态AI助手,能够像人类一样模拟通过鼠标和键盘进行网页浏览。
  • 剪辑视频怎么把说话声音转成文字?
  • maven打包插件配置模板
  • clusterProfiler包学习
  • 【Qt开发流程之】布局管理
  • 建筑可视化中的 3D 纹理
  • 9.docker镜像Tag为none的原因
  • HTML5学习系列之响应式图像
  • 基于数据库(MySQL)与缓存(Redis)实现分布式锁
  • 2023年A特种设备相关管理(锅炉压力容器压力管道)证模拟考试题库及A特种设备相关管理(锅炉压力容器压力管道)理论考试试题
  • 系统及其存储相关
  • 鸿蒙原生应用开发-折叠屏、平板设备服务卡片适配
  • android查漏补缺(8)Android广播不同种类介绍
  • 什么是美颜SDK?直播美颜SDK技术深度剖析
  • 红海营销时代,内容占位的出海品牌更有机会营销占位
  • 解决龙芯loongarch64服务器编译安装Python后yum命令无法使用的问题“no module named ‘dnf‘”
  • Leetcode2937. 使三个字符串相等