如何处理RocketMQ的各种线上问题
目录
消息积压
死信
如何追踪消息
如何搜索消息
范围查询
精确查找
消息重放(消息回溯)
消费者的齐幂性(防止重复消费)
消息积压
关于如何处理消息挤压,作者之前有一篇文章,专门聊过,详情异步:
【消息中间件】详解mq消息积压-CSDN博客
死信
当Broker向Consumer默认推送16次(可配置)消息无响应之后,Broker就会将消息发给死信队列。命名格式:%DLQ%<ConsumerGroup>
死信消息不会自动删除,需人工处理,处理方式也很简单就是将死信队列中的消息读出来发给原队列即可。
如何追踪消息
RcoektMQ支持消息轨迹的功能,通过配置可以开启消息轨迹功能,可以看到消息由谁生产、被谁消费等等消费过程的信息。
消息轨迹依赖于RMQ_SYS_TRACE_TOPIC这个队列,所以要确保这个队列的存在,这个队列是默认存在的要是不存在手动创建一下:
# 使用 mqadmin 命令创建 Topic ./mqadmin updateTopic -n <nameserver_address> -c <cluster_name> -t RMQ_SYS_TRACE_TOPIC -r 16 -w 16 -p 6 -o false
broker、consumer、producer三方均要配置开启消息轨迹。
broker.conf:
traceTopicEnable=true
consumer:
rocketmq.consumer.enable-msg-trace=true rocketmq.consumer.customized-trace-topic=RMQ_SYS_TRACE_TOPIC
producer:
rocketmq.producer.enable-msg-trace=true rocketmq.producer.customized-trace-topic=RMQ_SYS_TRACE_TOPIC
如何搜索消息
范围查询
#offset范围查询 ./mqadmin queryMsgByOffset -n <nameserver地址> \-t <Topic名称> \-b <Broker名称> \-i <QueueId> \-o <起始Offset> \-c <查询条数> # 示例(查询队列0最近100条消息) ./mqadmin queryMsgByOffset -n 127.0.0.1:9876 -t OrderTopic -b broker-a -i 0 -o 0 -c 100# 时间范围查询 ./mqadmin queryMsgByTime -n <nameserver地址> \-t <Topic名称> \-b <Broker名称> \-s <开始时间戳> \-e <结束时间戳> \-o <输出文件路径> # 示例(查询2023-11-10 00:00:00至2023-11-10 23:59:59的消息) ./mqadmin queryMsgByTime -n 127.0.0.1:9876 \-t OrderTopic \-b broker-a \-s 1699570800000 \-e 1699657199000 \-o /tmp/messages.txt
精确查找
RocketMQ为每条消息提供了两个维度的唯一标识:
-
messageID
-
messageKey
messageID是RocketMQ内部的对消息的唯一标识,messageKey是在业务层面自定义的消息的标识,一般会是业务的唯一标识,如订单号、交易流水号等。messageKey之所以存在,在设计层面上就是考虑到messageID是疯转在RocketMQ内部的东西,有些时候不知道messageID,但是想去精确查找到某条消息,所以支持在业务维度给message一个唯一标识。
#使用messageID查询 ./mqadmin queryMsgById -n <nameserver地址> -i <Message ID> # 示例 ./mqadmin queryMsgById -n 127.0.0.1:9876 -i "0A00000100002A9F0000000000000354" # 输出示例 Message ID: 0A00000100002A9F0000000000000354 Topic: OrderTopic Tags: TagA Keys: ORDER_202311100001 StoreTime: 2023-11-10 14:30:00 Body: {"orderId":"ORDER_202311100001","amount":99.99}# 查询指定Key的消息 ./mqadmin queryMsgByKey -n <nameserver地址> -t <Topic名称> -k <Key值> # 示例(查找订单号为ORDER_202311100001的消息) ./mqadmin queryMsgByKey -n 127.0.0.1:9876 -t OrderTopic -k ORDER_202311100001 # 输出示例 Total: 2 messages Message ID: 0A00000100002A9F0000000000000354 | StoreTime: 2023-11-10 14:30:00 | Tags: PAYMENT Message ID: 0A00000100002A9F0000000000000355 | StoreTime: 2023-11-10 14:35:00 | Tags: REFUND
消息重放(消息回溯)
有时候丢了一条消息,比如丢了一条订单,该如何重新消费指定的某条消息?
首先去通过精确查找(通过messageID或者messagekey)找到message,看一下消息在RocketMQ中是否存在,要是存在才能重新消费,要是不存在要重新生产一条message发给rocketmq。
# 根据订单号查询消息(假设订单号存储在Message Key中) ./mqadmin queryMsgByKey -n 127.0.0.1:9876 \-k ORDER_202311090001 \-t ORDER_TOPIC # 输出示例 Message ID: 0A00000100002A9F0000000000000354 QueueId: 3 StoreTime: 2023-11-09 14:30:00 Body: {"orderId":"ORDER_202311090001","amount":99.99,...}# 按消息存储时间重置(精确到秒) ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 \-g ORDER_CONSUMER_GROUP \-t ORDER_TOPIC \-s "2023-11-09#14:29:59:000" \ # 比实际存储时间早1秒-f true
然后写代码来重新消费,在代码里判断只有key是ORDER_202311090001的才消费其余不消费即可。
消费者的齐幂性(防止重复消费)
齐幂性:在某些不能要求重复数据出现的接口,重复的数据请求,多次和单次的效果应该一样。
当消费者明明在消费一条消息的时候,由于操作时延较大一直没消费完,从而给出broker消费成功的ACK,broker接着发了一条一模一样的重试消息过来,造成重复消费。
用一个redis集合之类的来存已经消费了或者正在消费的消息的ID,这样就能避免重复消费。