当前位置: 首页 > news >正文

如何处理RocketMQ的各种线上问题

目录

消息积压

死信

如何追踪消息

如何搜索消息

范围查询

精确查找

消息重放(消息回溯)

消费者的齐幂性(防止重复消费)


消息积压

关于如何处理消息挤压,作者之前有一篇文章,专门聊过,详情异步:

【消息中间件】详解mq消息积压-CSDN博客

死信

当Broker向Consumer默认推送16次(可配置)消息无响应之后,Broker就会将消息发给死信队列。命名格式:%DLQ%<ConsumerGroup>

死信消息不会自动删除,需人工处理,处理方式也很简单就是将死信队列中的消息读出来发给原队列即可。

如何追踪消息

RcoektMQ支持消息轨迹的功能,通过配置可以开启消息轨迹功能,可以看到消息由谁生产、被谁消费等等消费过程的信息。

消息轨迹依赖于RMQ_SYS_TRACE_TOPIC这个队列,所以要确保这个队列的存在,这个队列是默认存在的要是不存在手动创建一下:

# 使用 mqadmin 命令创建 Topic
./mqadmin updateTopic -n <nameserver_address> -c <cluster_name> -t RMQ_SYS_TRACE_TOPIC -r 16 -w 16 -p 6 -o false

broker、consumer、producer三方均要配置开启消息轨迹。

broker.conf:

traceTopicEnable=true

consumer:

rocketmq.consumer.enable-msg-trace=true
rocketmq.consumer.customized-trace-topic=RMQ_SYS_TRACE_TOPIC

producer:

rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=RMQ_SYS_TRACE_TOPIC

如何搜索消息

范围查询

#offset范围查询
./mqadmin queryMsgByOffset -n <nameserver地址> \-t <Topic名称> \-b <Broker名称> \-i <QueueId> \-o <起始Offset> \-c <查询条数>
​
# 示例(查询队列0最近100条消息)
./mqadmin queryMsgByOffset -n 127.0.0.1:9876 -t OrderTopic -b broker-a -i 0 -o 0 -c 100
# 时间范围查询
./mqadmin queryMsgByTime -n <nameserver地址> \-t <Topic名称> \-b <Broker名称> \-s <开始时间戳> \-e <结束时间戳> \-o <输出文件路径>
​
# 示例(查询2023-11-10 00:00:00至2023-11-10 23:59:59的消息)
./mqadmin queryMsgByTime -n 127.0.0.1:9876 \-t OrderTopic \-b broker-a \-s 1699570800000 \-e 1699657199000 \-o /tmp/messages.txt

精确查找

RocketMQ为每条消息提供了两个维度的唯一标识:

  • messageID

  • messageKey

messageID是RocketMQ内部的对消息的唯一标识,messageKey是在业务层面自定义的消息的标识,一般会是业务的唯一标识,如订单号、交易流水号等。messageKey之所以存在,在设计层面上就是考虑到messageID是疯转在RocketMQ内部的东西,有些时候不知道messageID,但是想去精确查找到某条消息,所以支持在业务维度给message一个唯一标识。

#使用messageID查询
./mqadmin queryMsgById -n <nameserver地址> -i <Message ID>
​
# 示例
./mqadmin queryMsgById -n 127.0.0.1:9876 -i "0A00000100002A9F0000000000000354"
# 输出示例
Message ID: 0A00000100002A9F0000000000000354
Topic: OrderTopic
Tags: TagA
Keys: ORDER_202311100001
StoreTime: 2023-11-10 14:30:00
Body: {"orderId":"ORDER_202311100001","amount":99.99}
# 查询指定Key的消息
./mqadmin queryMsgByKey -n <nameserver地址> -t <Topic名称> -k <Key值>
​
# 示例(查找订单号为ORDER_202311100001的消息)
./mqadmin queryMsgByKey -n 127.0.0.1:9876 -t OrderTopic -k ORDER_202311100001
​
# 输出示例
Total: 2 messages
Message ID: 0A00000100002A9F0000000000000354 | StoreTime: 2023-11-10 14:30:00 | Tags: PAYMENT
Message ID: 0A00000100002A9F0000000000000355 | StoreTime: 2023-11-10 14:35:00 | Tags: REFUND

消息重放(消息回溯)

有时候丢了一条消息,比如丢了一条订单,该如何重新消费指定的某条消息?

首先去通过精确查找(通过messageID或者messagekey)找到message,看一下消息在RocketMQ中是否存在,要是存在才能重新消费,要是不存在要重新生产一条message发给rocketmq。

# 根据订单号查询消息(假设订单号存储在Message Key中)
./mqadmin queryMsgByKey -n 127.0.0.1:9876 \-k ORDER_202311090001 \-t ORDER_TOPIC
​
# 输出示例
Message ID: 0A00000100002A9F0000000000000354
QueueId: 3
StoreTime: 2023-11-09 14:30:00
Body: {"orderId":"ORDER_202311090001","amount":99.99,...}
# 按消息存储时间重置(精确到秒)
./mqadmin resetOffsetByTime -n 127.0.0.1:9876 \-g ORDER_CONSUMER_GROUP \-t ORDER_TOPIC \-s "2023-11-09#14:29:59:000" \  # 比实际存储时间早1秒-f true

然后写代码来重新消费,在代码里判断只有key是ORDER_202311090001的才消费其余不消费即可。

消费者的齐幂性(防止重复消费)

齐幂性:在某些不能要求重复数据出现的接口,重复的数据请求,多次和单次的效果应该一样。

当消费者明明在消费一条消息的时候,由于操作时延较大一直没消费完,从而给出broker消费成功的ACK,broker接着发了一条一模一样的重试消息过来,造成重复消费。

用一个redis集合之类的来存已经消费了或者正在消费的消息的ID,这样就能避免重复消费。

http://www.lryc.cn/news/573663.html

相关文章:

  • 【Python学习笔记】报错:Unindent amount does not match previous indent
  • Spring Boot 项目初始化
  • AWS 使用图形化界面创建 EKS 集群(零基础教程)
  • LabVIEW图像拼接原理与实现 链接附件有演示录像
  • 如何用AI开发完整的小程序<9>—UI自适应与游戏页优化
  • 关于uniapp解析SSE响应数据的处理
  • 【学习笔记】深入理解Java虚拟机学习笔记——第11章 后端编译与优化
  • 关于CH32开发板烧录说明
  • 用可观测工具高效定位和查找设计中深度隐藏的bug
  • webpack+vite前端构建工具 -6从loader本质看各种语言处理 7webpack处理html
  • Linux内核中安全创建套接字:为何inet_create未导出及正确替代方案
  • SAP金属行业解决方案:无锡哲讯科技助力企业数字化转型与高效运营
  • Kafka Streams架构深度解析:从并行处理到容错机制的全链路实践
  • 针对数据仓库方向的大数据算法工程师面试经验总结
  • netcore url编码/解码
  • [计算机网络] 局域网内的网络传输
  • SpringBoot+Vue服装商城系统 附带详细运行指导视频
  • 3dgs涉及的基本概念:球谐系数(SH 系数)等
  • Python之数据容器
  • 【JavaScript】代码整理
  • vim学习流程,以及快捷键总结
  • Python 深度学习基础:TensorFlow 入门——从张量到神经网络的实战指南
  • 【2025年软考中级】第三章数据结构3.4 数组与矩阵
  • Flink作业三种部署模式:架构、配置与实战应用
  • rknn优化教程(三)
  • Bytemd@Bytemd/react详解(编辑器实现基础AST、插件、跨框架)
  • 【云原生】Docker 部署 Elasticsearch 9 操作详解
  • Git Worktree:高效开发的秘密武器
  • C# 数组(数组协变和数组继承的有用成员)
  • webpack+vite前端构建工具 - 8 代码分割