【RocketMQ】RocketMq之ConsumeQueue深入研究
目录
一:RocketMq 整体文件存储介绍
二:ConsumeQueue 的文件结构
三:ConsumeQueue 写入和查询流程
一:RocketMq 整体文件存储介绍
存储⽂件主要分为三个部分:
CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。
这篇文章主要介绍ConsumeQueue的研究,以rocketmq5.3.0版本作为研究。
二:ConsumeQueue 的文件结构
ConsumeQueue 的文件格式:
每个 ConsumeQueue 条目占用 20 字节,包含以下三个字段:
字段名 | 长度(字节) | 说明 |
---|---|---|
CommitLog Offset | 8 | 消息在 CommitLog 文件中的物理偏移量。 |
Message Length | 4 | 消息的长度。 |
Tag HashCode | 8 | 消息 Tag 的哈希值,用于快速查找具有相同 Tag 的消息。 |
这种固定长度的设计使得 ConsumeQueue 文件可以像数组一样随机访问,极大地提高了读取性能
三:ConsumeQueue 写入和查询流程
1. ConsumeQueue 写入流程图
+---------------------+
| 消息写入 CommitLog |
+---------------------+|v
+---------------------+
| 触发 Reput 操作 |
| - ReputMessageService |
+---------------------+|v
+---------------------+
| 获取 ConsumeQueue |
| - 根据 Topic 和 QueueId |
+---------------------+|v
+---------------------+
| 构建索引条目 |
| - CommitLog Offset |
| - Message Length |
| - Tag HashCode |
+---------------------+|v
+---------------------+
| 写入 ConsumeQueue 文件 |
| - 每个条目 20 字节 |
| - 文件大小 30 万条目 |
+---------------------+|v
+---------------------+
| 刷盘操作 |
| - 定期刷盘到磁盘 |
+---------------------+
写入流程
-
消息写入 CommitLog:
-
Broker 接收到消息后,将其顺序写入 CommitLog 文件
-
-
触发 Reput 操作:
-
Broker 中的
ReputMessageService
线程异步将 CommitLog 中的消息重新构建到 ConsumeQueue 文件
-
-
获取 ConsumeQueue:
-
根据消息的 Topic 和 QueueId,从
consumeQueueTable
中获取对应的 ConsumeQueue。如果不存在,则创建一个新的 ConsumeQueue
-
-
构建索引条目:
-
为每条消息构建一个索引条目,包含以下信息:
-
CommitLog Offset:消息在 CommitLog 中的物理偏移量。
-
Message Length:消息的长度。
-
Tag HashCode:消息 Tag 的哈希值
-
-
-
写入 ConsumeQueue 文件:
-
将索引条目写入 ConsumeQueue 文件。每个条目占用 20 字节,文件大小固定为 30 万个条目
-
-
刷盘操作:
-
定期将 ConsumeQueue 文件中的数据刷盘,确保数据持久化
-
consumequeue写入代码:org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
2. ConsumeQueue 查询流程图
+---------------------+
| 消费者拉取消息 |
| - Topic, QueueId |
| - 起始逻辑偏移量 |
+---------------------+|v
+---------------------+
| 查询 ConsumeQueue |
| - 根据逻辑偏移量计算 |
| ConsumeQueue 文件位置 |
+---------------------+|v
+---------------------+
| 读取索引条目 |
| - 获取 CommitLog Offset |
| - 获取 Message Length |
| - 获取 Tag HashCode |
+---------------------+|v
+---------------------+
| 定位 CommitLog 文件 |
| - 根据物理偏移量计算 |
| CommitLog 文件位置 |
+---------------------+|v
+---------------------+
| 从 CommitLog 读取消息 |
| - 根据物理偏移量读取 |
| - 校验消息完整性 |
+---------------------+|v
+---------------------+
| 返回消息给消费者 |
+---------------------+
查询流程
-
消费者拉取消息:
-
消费者指定 Topic、QueueId 和起始逻辑偏移量,向 Broker 发起拉取消息请求
-
-
查询 ConsumeQueue:
-
Broker 根据逻辑偏移量计算 ConsumeQueue 文件的位置,读取对应的索引条目
-
-
读取索引条目:
-
从 ConsumeQueue 文件中读取索引条目,获取消息在 CommitLog 中的物理偏移量、消息长度和 Tag 哈希值
-
-
定位 CommitLog 文件:
-
根据物理偏移量计算 CommitLog 文件的位置,读取对应的消息内容
-
-
从 CommitLog 读取消息:
-
从 CommitLog 文件中读取消息内容,并校验消息的完整性
-
-
返回消息给消费者:
-
Broker 将读取到的消息内容返回给消费者
-
查询代码入口: org.apache.rocketmq.store.DefaultMessageStore#getMessage(java.lang.String, java.lang.String, int, long, int, int, org.apache.rocketmq.store.MessageFilter)