文章目录
- 1.消息丢失
- 2.消息重复消费(幂等)
- 3.消息积压
- 4.顺序消息
- 5.分布式事务(数据一致性)
- 6.消息延迟&毛刺
- 7.平滑扩容和缩容
- 8.高可用&异地多活
- 9.消息体积过大
- 10.版本兼容 & 协议升级
- 10.总结
1.消息丢失
触发点 | RocketMQ 防护手段 |
---|
生产者宕机未收到 ACK | send(msg, timeout) 捕获异常并重试;开启 事务消息(Half Message + 本地事务表) |
Broker 刷盘前崩溃 | 配置 flushDiskType=ASYNC_FLUSH + 同步刷盘;开启 DLedger(Raft)多副本 |
消费者先 ACK 后业务失败 | 关闭 自动 ACK,改为 手动 ACK(ConsumeConcurrentlyStatus.RECONSUME_LATER ) |
2.消息重复消费(幂等)
- 根因:Broker 重投或 Consumer 重启。
- 方案:
- 幂等键:消息 Key + 业务唯一键 → 写入 Redis
SETNX
或 MySQL 唯一索引; - RocketMQ 自带去重:开启
enableIdempotent=true
(5.x 版本)
3.消息积压
工具 | 用法 |
---|
RocketMQ Dashboard | 查看 消费 TPS 、消息堆积量 |
快速扩容 | 增加 消费组实例数 + consumeThreadMax 调大; |
批量拉取 | 设置 pullBatchSize=1000 + consumeMessageBatchMaxSize=32 |
降级 | 非核心消息直接投递 延迟队列,或转 死信队列 |
4.顺序消息
- 场景:订单状态机、库存扣减。
- 方案:
- 顺序消息:同一业务 Key → 同一个 MessageQueue;
- 失败重试:顺序消费失败后进入 重试队列,仍保证顺序重放。
5.分布式事务(数据一致性)
TransactionSendResult result =producer.sendMessageInTransaction(msg, localTransactionExecuter, arg);
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.COMMIT_MESSAGE;
}
public LocalTransactionState checkLocalTransaction(MessageExt msg) {return checkBizStatus(msg) ? COMMIT_MESSAGE : ROLLBACK_MESSAGE;
}
6.消息延迟&毛刺
概念 | 通俗解释 | 产生原因 | 典型症状 |
---|
消息延迟 (Latency) | 消息从发送到被消费者成功处理所耗时间 远大于预期。 | • Broker GC / CPU 飙高 | 监控图里“平均延迟”持续升高,业务体感“订单状态更新慢”。 |
毛刺 (Spike / Jitter) | 延迟曲线出现瞬间尖峰(毫秒级→秒级→毫秒级)。 | • 瞬时流量洪峰 | |
6.1.消息延迟
现象 | 典型根因 | RocketMQ 解法 & 配置参数 |
---|
端到端耗时 > 业务 SLA | 1. 生产者同步刷盘等待 | • Broker:flushDiskType=ASYNC_FLUSH + transientStorePoolEnable=true |
跨城链路延迟 | 网络 RTT 高 | • 就近部署:Producer & Consumer 与 Broker 同可用区 |
大促洪峰 | 生产者 TPS >> 消费者 TPS | • 消费者水平扩容:动态增加实例(K8s HPA) |
6.2.毛刺
现象 | 典型根因 | RocketMQ 解法 & 配置参数 |
---|
99.9 线瞬间飙高 | 1. 单条超大消息阻塞网络 | • 消息拆分:>1MB 走 OSS,只传 URL |
Broker CPU 瞬时 100% | 热点 Key 打在同一队列 | • Key Hash 打散:自定义 MessageQueueSelector |
磁盘抖动 | Page Cache 回收 | • 预留内存:osPageCacheBusyTimeOutMillis=1000 |
7.平滑扩容和缩容
- Broker 无状态:新增 Broker 后,Topic 路由元数据自动同步;
- 队列重平衡:Consumer 端使用 AllocateMessageQueueAveragely 算法,秒级感知变化。
8.高可用&异地多活
- DLedger(Raft):Broker 组自动选主,支持 同城双中心 / 异地三中心;
- NameServer 多节点:无状态,秒级故障切换。
9.消息体积过大
- 拆分:大文件放 OSS,MQ 只传 URL;
- 压缩:
compressType=LZ4
,带宽 ↓50%。
10.版本兼容 & 协议升级
- RocketMQ 5.x 支持 gRPC 协议,老客户端通过 Proxy 平滑升级;
- Schema Registry:支持 Avro/Protobuf 版本兼容。
10.总结
问题 | RocketMQ 解法 | 关键配置 |
---|
丢失 | 事务消息 + DLedger | flushDiskType, DLedger |
重复 | 幂等键 + enableIdempotent | messageKey, 唯一索引 |
积压 | 扩容 + 批量消费 | pullBatchSize, consumeThreadMax |
顺序 | 顺序队列 + 重试队列 | MessageQueueSelector |
事务 | Half Message + 本地事务表 | sendMessageInTransaction |
延迟 | Dashboard + 重试参数 | maxReconsumeTimes |