消息中间件(Kafka VS RocketMQ)
一、概要介绍
- Apache Kafka
- 分布式、多区、多副本、基于 ZooKeeper 协调的消息系统
- 核心组件:ZooKeeper、Broker、Producer、Consumer
- 特点:高吞吐(顺序写+零拷贝)、低延迟、易扩展
- Apache RocketMQ
- 分布式、轻量 NameServer路由、主从多副本、阿里巴巴开源的消息引擎
- 核心组件:NameServer、Broker、Producer、Consumer
- 特点:原生事务消息、定时/延迟投递、灵活顺序消费
二、架构与原理
1、Kafka
- 服务设计
- 目标:提供可水平扩展的分布式提交日志服务,实现高吞吐、低延迟、容错与持久化;
- 分层:
- 存储层:基于 CommitLog 的顺序写入,利用零拷贝提高磁盘 I/O 效率;
- 计算层:Producer/Consumer、Kafka Streams、Connect API,支持流处理与数据集成。
- 核心组件
- ZooKeeper:集群元数据管理、Leader 选举与配置存储;
- Broker:Kafka 服务器节点,负责接收、存储、分发消息;
- Producer:消息生产方,将数据发送至指定 Topic;
- Consumer:消息消费方,通过拉模型按 Offset 拉取数据。
- 架构要点
- Topic & Partition:Topic 逻辑分组,Partition 物理子分片,支持并行与负载均衡;
- Replication:每个 Partition 在多个 Broker 上拥有 Leader/Follower 副本,实现高可用;
- Controller:集群内唯一的 Controller Broker,负责分区分配与故障检测。
- 工作原理
- 生产流程:Producer 从 ZooKeeper 或 Controller 获取 Topic 路由信息 —> 选择 Partition —> 顺序写入 CommitLog —> 返回 Offset;
- 复制流程:Leader 接收写入请求 —> 同步写入本地 —> Follower 拉取并写入 —> 保持 ISR(In-Sync-Replicas);
- 消费流程:Consumer Group 拉取 Topic 路由 —> 各 Consumer 按分区拉取消息 —> 根据 Offset 进行位点提交与 Rebalance
2、RocketMQ
- 服务设计
- 目标:提供高可靠、低延迟、灵活多样的消息中间件,强调事务、顺序与延迟消息;
- 轻量化:去中心化 NameServer,无状态设计,简化元数据管理。
- 核心组件
- NameServer:服务发现与路由注册中心,无需持久化,节点间不通信,水平扩展简单
- Broker:消息存储与转发节点,分 Master/Slave 部署,顺序写 CommitLog,并维护 ConsumeQueue 索引;
- Producer:通过 Netty 连接 NameServer —> 获取路由 —> 发送消息至 Broker,可选择同步/异步/单向模式;
- Consumer:支持 Push/Pull 模型,可配置集群消费或广播消费,消费后向 Broker ACK,并可重试或进入私信队列。
- 架构要点
- Topic & Queue:Topic 逻辑分类,Queue 无力分片;多 Queue 支持并行消费与顺序消费;
- Replica & HA:Master/Slave 同步或异步复制,故障自动切换保证可用;
- 事务 & 定时:原生事务消息二阶段提交;内置定时/延迟投递机制,无需额外组件。
- 工作原理
- 注册与心跳:Broker 启动向所有 NameServer 注册并定期心跳;Producer/Consumer 启动向 NameServer 获取路由并心跳;
- 生产流程:Producer 获取 Topic-Queue 路由 —> 选择 Broker Master —> 顺序写入 CommitLog —> 更新 ConsumeQueue;
- 消费流程:Consumer 拉取路由 —> 对应 Queue 发起 Pull 请求或 Broker 主动 Push —> 处理消息后 ACK —> Broker 更新消费进度。
三、服务部署
在生产环境中,要让 Kafka 和 RocketMQ 都具备“高可用(Availability)”、“高并发(Througput)”和“高可靠(Reliability)”三大能力,就必须在架构设计、集群部署与参数调优三个层面做到精细化:
- 架构设计:多节点、多可用区冗余,去中心化或弱中心化元数据服务;
- 集群部署:合理分片/队列数、主从副本布局、心跳检测与自动切换;
- 参数调优:根据 HA/Throughput/Reliability 三大目标,选择不同维度的核心参数,并理解它们从“不可靠”到“绝对可靠”的逐级演进。
1、Kafka
- 架构与部署拓扑
- ZooKeeper 集群
- 节点数:建议 3-5 个(奇数),分别部署在不同机架或可用区
- 负责:元数据存储、Controller选举、Broker 配置同步
- Broker 集群
- 节点数:>=3个,每个 Broker 承载多个 Partition;
- Topic 分区:分区数建议 >= 消费实例数*2,以保证并行度
- 副本因子:replication.factor = 3(至少三副本)
- Controller:随机在 Broker 上选举产生,负责 Rebalance、Leader 选举等
- Producer / Consumer:部署于业务服务侧或专用消息网关,配置多 Broker 地址以备切换
- ZooKeeper 集群
- 高可用(Availability)
- 多副本与 ISR 机制
- 只让与 Leader 保持同步的副本(ISR)参与选举,保证切换时读写一致;
- 参数要点:
- min.insync.replicas=2:至少两个同步副本确认
- unclean.leader.election.enable=false:禁止滞后副本选为 Leader
- ZooKeeper 心跳调优
- tickTime(心跳间隔)、initLimit(Follower 同步超时心跳数)、SyncLimit(常规心跳丢失数)
- 调优原则:心跳间隔不宜过大、否则故障感知慢;也不能过小,网络抖动易误判
- Controller 冗余选举
- Broker 之间任意节点都可成为 Controller,故障后秒级选举
- 监控:Controller 变更事件告警
- 多副本与 ISR 机制
- 高并发(Throughput)
- Partition 并行:每个 Topic 分区数按消费实例数*2来规划,分区越多,写入/消费并行度越高
- 批量与压缩
- linger.ms=5ms:最多等待5ms批量发送
- batch.size=64KB:单个批次最大消息量
- compression.type=snappy:压缩减小网络与磁盘 I/O
- 网络 & I/O线程池分离
- num.network.threads=3:专门处理网络请求
- num.io.threads=8:专门处理磁盘读写
- 这样可防止网络波动影响磁盘 I/O,又可提高并发承载
- 高可靠(Reliblity)
- ACK 进阶演进
- acks=0:零等待、最高吞吐、最低延迟,但完全不可靠
- acks=1:Leader 确认后返回,延迟极地,但 Leader 宕机前同步未完成时丢失数据
- acks=all + min.insync.replicas=2:Leader + 至少一个 Follower 确认后返回,丢失概率极低
- 幂等保障:enable.idempotence=true:自动重试无限次、顺序写、Producer ID + 序列号去重,实现 Exactly-once
- 事务支持:配置 transactional.id,使用两阶段提交,Broker 维护事务状态并回查,保证业务与消息原子提交
- 日志落盘:
- log.flush.scheduler.interval.ms:定时将内存写入磁盘
- 可选同步刷盘,但一般生产环境依赖多副本确认即可,无需每条同步刷盘
- ACK 进阶演进
2、RocketMQ
- 架构与部署拓扑
- NameServer 集群
- 节点数:2-3个,无状态、只存内存路由信息
- 客户端(Producer/Consumer)配置多地址,失效自动切换
- Broker Master/Slave
- Master:写入并供 Consumer 拉取;Slave:只读且同步 Master
- 建议每个 Master 至少配置一个 Slave,跨机房部署灾备
- Producer / Consumer:部署于业务侧或独立网关,长连接 NameServer+Broker,定期心跳
- NameServer 集群
- 高可用(Availability)
- 无状态 NameServer
- 节点间不通信,重启后通过 Broker 心跳自动重建路由
- pollNameServerInterval=30000ms:客户端每 30s 拉取一次路由
- 主备切换
- Master 宕机后,30s内 Slave 自动升级为 Master
- heartbeatBrokerInterval = 30000ms:客户端向 Broker 心跳间隔,保证切换及时
- 请求阻塞控制:brokerSuspendMaxTimeMillis=15000ms:Broker 最长阻塞时间,防止单请求拖垮线程池
- 无状态 NameServer
- 高并发(Throughput)
- Queue 并行:每个 Topic 配置 queueNum=消费线程数*2,提升消费并发
- 批量 & 压缩
- compressMsgBodyOverHowmuch = 4096 bytes:超大消息自动压缩
- 异步发送:retryTimesWhenSendAysncFailed=3 重试
- 线程池配置
- sendMessageThreadPollNums=8:Broker 接收写入线程
- pullMessageThreadPollNums=8:Broker 响应消费线程
- consumeThreadMin/Max=20/64:Consumer 端并发消费动态伸缩
- 高可靠(Reliablity)
- 同步刷盘
- flushDiskType=SYNC_FLUSH:Master写入磁盘后才返回,确保零丢失
- 可选 ASYNC_FLUSH,牺牲少量可靠换取更高吞吐
- 重试与死信:maxReconsumeTims=16:消费失败后最大重试次数,超出转入死心队列
- 事务消息:使用 TransactionMQProducer,两阶段提交+Broker回查,保证跨系统事务一致性
- 同步刷盘
四、消息
1、顺序消息
- Kafka的顺序消息只能在单个分区内保证严格顺序,通过消息key或者自定义分区器将相关消息路由到同一个分区,再由消费者单线程按 Offset 顺序消费;
- RocketMQ除了类似的“单队列”顺序外,还通过 MessageQueueSelector 实现“分区顺序”(局部有序),并在 Broker 端对队列级别加锁,保证每个队列在同一时刻只有一个线程消费。
(1)Kafka
全局顺序(Global Order)
- 实现:将 topic 配置为只有一个 partition,并且只有1个消费者实例,即可在全局上保持顺序
- 性能折中:吞吐与并行度大幅下降,因为只有单分区、单线程在跑
分区内顺序(Partition Order)
- 实现原理:Kafka 只在单个 Partition 内保证顺序,所有写入该 Partition 的消息按 Offset 严格有序
- 消息路由:
- Key分区:ProducerRecord 构造时带 key,DefaultPartitioner 会把相同 key 的消息路由到同一分区
- 自定义分区器:实现 Partitioner 接口,覆盖 partition() 方法
// Producer
// 构造 ProducerRecord(topic, key, value) → 客户端调用 send(record)。
// DefaultPartitioner.partition(topic, keyBytes, ...) 内部做:
int numPartitions = partitionsFor(topic).size();
return (Utils.murmur2(keyBytes) & 0x7fffffff) % numPartitions;// 自定义分区器 实现 org.apache.kafka.clients.producer.Partitioner 接口:
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {// 比如按用户ID取模return Math.abs(key.hashCode()) % cluster.partitionsForTopic(topic).size();}
}// 客户端配置:
partitioner.class=com.example.MyPartitioner// 这样所有 producer.send() 时都会走你的 partition() 逻辑,在发送前就决定好了分区。
// Producer 的路由逻辑在调用 send() 时就完成了,消费者不会再“路由”——它只会拉取自己所属的分区。// Comsuner
// 分区分配:Consumer Group 启动后,协调出每个实例各自负责哪些分区(由协调器 / rebalance 算法决定)。
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 这里 record.topic(), record.partition(), record.offset() 是固定的}consumer.commitSync();
}// 总结:Producer 决定“放到哪个分区”;Consumer 决定“从哪些分区拉”。中间没有二次路由。
(2)RocketMQ
全局顺序(Global Order)
- 实现:Topic 只能配置1个队列,1个消费者实例;所有消息都发送到同一队列,单线程消费
// 发送端:只定义一个队列
producer.send(message, new MessageQueueSelector() {public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {return mqs.get(0); // 始终选择第一个队列}
}, null);
// 消费端:使用顺序监听器
consumer.registerMessageListener(new MessageListenerOrderly() { ... });
分区顺序(Partition Order)
- 实现:根据业务的 Sharding Key 选队列,同一 Key 的消息走同一队列;消费者可多实例,每个实例独占若干队列。
producer.send(msg, (mqs, msg, key) -> {int index = key.hashCode() % mqs.size();return mqs.get(index);
}, orderId);
consumer.registerMessageListener(new MessageListenerOrderly() { ... });
队列级锁:
Broker 在消费每个队列前会加锁,保证同一时刻只有一个线程消费该队列,顺序不会被打乱。
总结:
- 全局顺序:topic 只配1个队列,并且消费者实例数=1;所有消息都跑到同一个队列;
- 分区顺序:topic配多个队列;Producer 根据 Sharding Key 选定队列;多个消费者实例,各自独占若干队列;
- 队列级锁
- Broker端:收到消费者订阅请求后,为每个 MessageQueue(队列)维护一把分布式锁;
- 加锁时机:当消费者实例调用 MessageListenerOnderly 顺序消费时,客户端会向 Broker 请求“对某个队列(MessageQueue)加锁”。加锁成功后,该实例再整个批次(ConsumeOrderlyBatchMaxSize)乃至单条消息逐个处理完毕并显示提交ACK之前,不会释放锁;
- 锁的生命周期:
- 获取锁 —> 开始 pull 并依次交给 listener 处理
- 处理完成所有拉到的消息后 —> 主动释放锁
- 如果在 consumeOrderlyTimeout 时间内未能完成处理并释放锁,Broker 会强制过期这把锁,然后其他实例才能继续抢锁消费
- Broker 再将队列分配给下一个有空闲线程的消费者实例
- 消费模式:一条一条拉取 vs 批量拉取
- 批量拉取:默认 RocketMQ 顺序消费每次 pull 一批消息(可配置),但客户端内部会将这批消息逐条调用 MessageListenerOrderLy
- 单条消费:你的 listener 也可以每次只处理一条,处理完再 ack。只要队列锁未释放,就会继续拉取下一条。
2、消息过滤
- RocketMQ 原生支持服务端过滤(Tag过滤、SQL92属性过滤)和消费端多虑,可以在Broker上就把不感兴趣的消息“丢掉”,降低网络与客户端压力;
- Kafka本身不支持Broker层的消息内容过滤,只能靠消费端过滤或借助 Kafka Streams/KSQL 等流处理层来实现,真正的筛选总是在消费者一侧完成。
(1)Kafka
消费端过滤
消费者在 poll() 后,根据消息内容自行判断;无用的消息也会进入消费端
Kafka Streams / KSQL 过滤(附加层)
如果需要在 Broker 与下游业务之间做过滤“类似”服务端效果,可部署 Kafka Streams 或 ksqlDB:
// Kafka Streams 示例
KStream<String, Order> orders = builder.stream("TopicTest");
KStream<String, Order> filtered = orders.filter((k, v) -> v.getStatus().equals("CREATED"));
filtered.to("TopicCreatedOrders");
将过滤结果写入新的 Topic,消费者再只订阅 TopicCreatedOrders。
缺点:需要额外部署流处理集群,消息经过二次写入才可消费。
(2)RocketMQ
Tag过滤(最轻量)
原理:给消息打上一个或多个Tag(标签),Broker 为每个队列维护 Tag 到消息位置的简单索引;
服务端:
// Producer 发送时指定 Tag
Message msg = new Message("TopicTest", "TagA", "OrderID001", body);
producer.send(msg);
Broker 在存储时,会同时在消费队列的索引中记录该消息的 Tag;当消费者订阅 TagA 时,只扫描索引取出带有 TagA 的消息。
消费端:
// Consumer 订阅时指定 Tag 过滤
consumer.subscribe("TopicTest", "TagA || TagB");
消费者只拉取包含 A 或 B 的消息,不会接收到其它 Tag 的数据。
SQL92 属性过滤(精细化)
原理:消息可附带任意用户属性(key-value),Broker在存储时把这些属性信息也写入 ConsumeQueue 的二级索引;订阅时可传入 SQL 表达式,Broker 会对索引做简单的条件匹配,只返回符合条件的消息。
生产端:
Message msg = new Message("TopicTest", // topic"TagA", // tagbody);
msg.putUserProperty("orderStatus", "CREATED");
producer.send(msg);
消费端:
// 服务端过滤
consumer.subscribe("TopicTest",MessageSelector.bySql("orderStatus = 'CREATED' AND price > 100"));
这样只有属性条件满足的消息才会被拉取到客户端,极大减少网络流量与客户端CPU消耗。
3、延迟消息
- RocketMQ 原生支持延迟消息,通过将消息先存入内部特殊的调度主题 SCHEDULE_TOPIC_XXXX,并由18个定时任务定期扫描、判断、转发到目标 Topic,来实现任意一级延迟;
- Kafka 核心并不内置延迟消息能力,仅能借助外部定时重投、Kafka Streams 定时器或延迟队列 Topic 等手段来模拟延迟发送。
(1)Kafka
Kafka API 中没有类似 RocketMQ 的“延迟等级”或“调度主题”机制;Producer 的 send()
方法只能指定目标 Topic、Partition、Key、Timestamp 等字段,无法内置延迟投递。
(2)RocketMQ
延迟等级与配置:
- 默认 18 个等级:从 1s、5s、... 到 2h;
- 可在 broker.conf 中通过 messageDelayLevel=... 自定义对应的延迟时间,并在 Broker 启动时加载到 DelayLevelTable
消息流转:
- 生产者发送带延迟级别的消息:
Message msg = new Message(topic, tags, body); // 发送时指定延迟级别 3(10s) SendResult res = producer.send(msg, 3);
- 存储到调度主题:Broker 不直接存入目标 Topic,而是写入内部主题 SCHEDULE_TOPIC_XXXX 对应的第3号队列(延迟级别队列)
- 定时调度:RocketMQ启动后,会为每个延迟等级启动一个定时任务(默认每100ms执行一次),执行:
for each delayLevel in DelayLevelTable:queueId = delayLevel.index - 1nextOffset = topicScheduleMessageService.fetchOffset(queueId)msgExt = scheduleMessageStore.getMessage(queueId, nextOffset)if msgExt.storeTimestamp + delayTime(delayLevel) <= now:// 时间到了,投递到原 TopicbrokerController.getMessageStore().putMessage(clone msg with real Topic)scheduleMessageStore.updateOffset(queueId, nextOffset + 1)
- 转发到真实Topic:调度任务将消息“透传”到原本的 Topic 和队列,消费者就像正常消费普通消息一样接收。
关键点:
- 调度主题不可见:客户端无法直接订阅 SCHEDULE_TOPIC_XXXX;
- 定时队列指针:每个等级维护偏移量,下次从上次结束处继续扫描;
- 并发调度:18个任务并行处理不同等级,互不影响;
- 可调整精度:通过修改定时任务间隔(20ms -> 100ms)与延迟级别时间,可在毫秒级到小时级自由配置。
4、事务消息
- RocketMQ 原生支持事务消息,通过“半消息+本地事务+回查”三步走,实现了本地事务与消息投递的原子性,保证“订单写入成功 <=> 消息投递成功”同步一致。
- Kafka 从0.11版本起也提供了事务API,支持 Producer 端在多个分区和多个 Topic 上的 Exactly-Once 语义,但其实现机制与 RcoketMQ 略有差异。
(1)Kafka
- 核心机制:事务协调器(Transaction Coordinator)
- 幂等生产者:开启 enable.idempotence=true 后,Kafka 客户端自动为每条消息分配序列号,Broker 去重;
- 事务 API:
- initTransactions() 初始化与协调器的连接;
- beginTransaction() 开启事务;
- 多次 send() 到同一个或多个 Topic/Partition;
- 成功阶段:commitTransaction() —> 协调器写入事务完成标记;
- 失败阶段:abortTransaction() —> 丢弃事务内消息;
- Exactly-Once:消费者需配置 isolation.level=read_committed,只读已提交事务消息。
- 实现细节与限制
- 事务协调器:部署在 Kafka Broker 上,每个事务由协调器管理;
- 事务范围:同一 Producer 实例内,多个 Topic/Patition;
- 幂等与事务耦合:Producer 自动在幂等基础上开启事务,确保无重复并原子提交;
- 性能考量:
- 事务内消息在提交前对消费者不可见,需额外的协调开销;
- 事务跨分区会触发协调器同步,增加延迟。
(2)RocketMQ
- 基本模型:二阶段提交
- Prepare(半消息):Producer调用 sendMessageInTransaction(msg, arg),Broker 接收并保存为“半消息”(不可见给消费者);
- 执行本地事务:Producer 在本地执行业务(如下单、扣库存),代码在 executeLoacalTransaction() 中完成,返回 COMMIT 或 ROLLBACK;
- Commit / Rollback:Producer 调用 endTransaction() 通知 Broker:
- COMMIT_MESSAGE —> Broker 将半消息变为可投递消息;
- ROLLBACK_MESSAGE —> Broker 删除半消息,不投递。
- 事务回查:若 Producer 在超时时间内未调用 endTransaction() (网络异常、进程崩溃等),Broker 会定期发起回查请求,Producer 的 checkLocalTransaction(MessageExt msg) 负责读本地状态(持久化的事务记录)并返回状态,最终保证“悬挂”事务决断。
- 实现细节与关键点
- 半消息存储:写入到内部主题 RMQ_SYS_TRANS_HALF_TOPIC,并带上唯一 transactionId、msg.getKeys() 或 userProprety,用于回查定位;
- 事务状态持久化:强烈建议将本地事务状态(如 orderId —> SUCCESS/FAILED)持久化到数据库,避免 JVM 崩溃后丢失状态;
- 幂等与安全:
- checkLoaclTransaction() 可能被多次或并发调用,务必保证幂等、安全;
- 回查返回 UNKNOW 可让 Broker 继续重试,以防业务系统暂时无法判断。
- 回查频率:Broker 后台线程定期(默认每30s)检查未决事务,直到收到明确状态;
- 性能影响:
- 半消息写两次磁盘:一次 Prepare,一次 Commit;
- 持久化事务记录与幂等回查的额外 DB 访问。
5、广播消息
- RocketMQ原生支持“广播消息”模式,只要将消费者的 MessageModel 设置为 BROADCASTING,就能让同一消费组内的所有实例都各自收到并消费每一条消息;
- Kafka 不支持同组内广播。
(1)Kafka
Kafka的 Consumer Group 设计目标就是“同组内消息分发到单实例”,不支持组内广播。
(2)RocketMQ
原理剖析
消费模式切换
- 默认是集群模式(CLUSTERING):同组内消息只会被一个实例消费;
- 广播模式(BROADCASTING):同组内每个实例都能消费每条消息。
- 消费位点存储
- 集群模式:位点(offset)由 Broker 统一维护;
- 广播模式:位点由客户端本地保存(通常是本地文件或内存,默认是 ${user.home}/rocketmq_offsets/broadcast_group/),每次拉取后会持久化
- 消息分发:Broker 在推送消息时,不检查该组中其他实例的消费状态,只要实例在线并订阅,就逐条下发;
- 故障与重启:
- 如果本地已有保存的 offset,消费者重启后会从该 offset 继续消费(而非从0开始)
- 如果本地尚无任何 offset(首次启动或数据目录被清空),将根据 consumeFromWhere 配置决定起点(如果没有显示的配置 consumeFromWhere,RMQ的广播消费者在首次启动时会使用默认策略
CONSUME_FROM_LAST_OFFSET,只从最新消息之后开始消费,不会重跑历史消息)
:// 默认为 CONSUME_FROM_LAST_OFFSET consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 可选 CONSUME_FROM_FIRST_OFFSET 或 CONSUME_FROM_TIMESTAMP
- 所以只要不删除本地 offset 文件,每次重启都能接着上次进度继续。
使用方式
// 1. 创建 Consumer 并设置为广播模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 切换为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 订阅 Topic
consumer.subscribe("TopicBroadcast", "*");
// 注册消息监听
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {// 逐条处理System.out.printf("实例[%s] 收到消息: %s%n",InetAddress.getLocalHost().getHostName(),new String(msg.getBody(), StandardCharsets.UTF_8));}// 手动 Ackreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
- 手动 ACK:广播模式下,Broker 不等待 ACK,但客户端仍可通过返回 CONSUME_SUCCESS 或抛出异常来触发本地重试/死信机制;
- 重试与DLQ:当消费失败时,客户端可自行重试,或配置 maxReconsumeTimes 后转入死信队列,保证至少一次投递;
- 位点回溯:可在客户端启动时调用 seek() 或设置 consumeFromWhere 为 CONSUME_FROM_TIMESTAMP,回溯到指定时间点。
- 广播模式默认不会自动回溯N条,只在首次无位点时根据 consumeFromWhere 决定起点
- 若需“回溯N条”,手动读取并修改本地 offset;
- 对于集群模式,推荐使用 mqadmin resetOffsetByTime 或 updateConsumerOffset 工具按时间或按条数统一重置
确保消息不丢的最佳实践
- 冗余发送:在 Broker 端开启主从同步刷盘(SYNC_FLUSH),避免挂掉导致消息丢失;
- 定期 pull:消费者使用 pull 模式定期拉取,降低网络抖动带来的漏拉消费;
- 本地记录:消费端可维护“已消费消息ID”列表,配合去重,防止重复或漏消费;
- 重平衡回溯:在实例掉线后重启,主动拉取未消费消息,或回溯N条,确保每条都被消费过