当前位置: 首页 > news >正文

【RocketMQ】RocketMq之ConsumeQueue深入研究

目录

一:RocketMq 整体文件存储介绍

二:ConsumeQueue 的文件结构

三:ConsumeQueue 写入和查询流程


一:RocketMq 整体文件存储介绍

存储⽂件主要分为三个部分:

    CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
    ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
    IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。

这篇文章主要介绍ConsumeQueue的研究,以rocketmq5.3.0版本作为研究。


二:ConsumeQueue 的文件结构

ConsumeQueue 的文件格式:

每个 ConsumeQueue 条目占用 20 字节,包含以下三个字段:

字段名长度(字节)说明
CommitLog Offset8消息在 CommitLog 文件中的物理偏移量。
Message Length4消息的长度。
Tag HashCode8消息 Tag 的哈希值,用于快速查找具有相同 Tag 的消息。

这种固定长度的设计使得 ConsumeQueue 文件可以像数组一样随机访问,极大地提高了读取性能


三:ConsumeQueue 写入和查询流程

1. ConsumeQueue 写入流程图

+---------------------+
| 消息写入 CommitLog   |
+---------------------+|v
+---------------------+
| 触发 Reput 操作       |
| - ReputMessageService |
+---------------------+|v
+---------------------+
| 获取 ConsumeQueue    |
| - 根据 Topic 和 QueueId  |
+---------------------+|v
+---------------------+
| 构建索引条目         |
| - CommitLog Offset   |
| - Message Length     |
| - Tag HashCode       |
+---------------------+|v
+---------------------+
| 写入 ConsumeQueue 文件 |
| - 每个条目 20 字节     |
| - 文件大小 30 万条目   |
+---------------------+|v
+---------------------+
| 刷盘操作             |
| - 定期刷盘到磁盘     |
+---------------------+

写入流程

  1. 消息写入 CommitLog

    • Broker 接收到消息后,将其顺序写入 CommitLog 文件

  • 触发 Reput 操作

    • Broker 中的 ReputMessageService 线程异步将 CommitLog 中的消息重新构建到 ConsumeQueue 文件

  • 获取 ConsumeQueue

    • 根据消息的 Topic 和 QueueId,从 consumeQueueTable 中获取对应的 ConsumeQueue。如果不存在,则创建一个新的 ConsumeQueue

  • 构建索引条目

    • 为每条消息构建一个索引条目,包含以下信息:

      • CommitLog Offset:消息在 CommitLog 中的物理偏移量。

      • Message Length:消息的长度。

      • Tag HashCode:消息 Tag 的哈希值

  • 写入 ConsumeQueue 文件

    • 将索引条目写入 ConsumeQueue 文件。每个条目占用 20 字节,文件大小固定为 30 万个条目

  • 刷盘操作

    • 定期将 ConsumeQueue 文件中的数据刷盘,确保数据持久化

consumequeue写入代码:org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo


2. ConsumeQueue 查询流程图

+---------------------+
| 消费者拉取消息       |
| - Topic, QueueId     |
| - 起始逻辑偏移量     |
+---------------------+|v
+---------------------+
| 查询 ConsumeQueue    |
| - 根据逻辑偏移量计算 |
|   ConsumeQueue 文件位置 |
+---------------------+|v
+---------------------+
| 读取索引条目         |
| - 获取 CommitLog Offset |
| - 获取 Message Length |
| - 获取 Tag HashCode   |
+---------------------+|v
+---------------------+
| 定位 CommitLog 文件   |
| - 根据物理偏移量计算  |
|   CommitLog 文件位置   |
+---------------------+|v
+---------------------+
| 从 CommitLog 读取消息 |
| - 根据物理偏移量读取  |
| - 校验消息完整性      |
+---------------------+|v
+---------------------+
| 返回消息给消费者     |
+---------------------+

查询流程

  1. 消费者拉取消息

    • 消费者指定 Topic、QueueId 和起始逻辑偏移量,向 Broker 发起拉取消息请求

  • 查询 ConsumeQueue

    • Broker 根据逻辑偏移量计算 ConsumeQueue 文件的位置,读取对应的索引条目

  • 读取索引条目

    • 从 ConsumeQueue 文件中读取索引条目,获取消息在 CommitLog 中的物理偏移量、消息长度和 Tag 哈希值

  • 定位 CommitLog 文件

    • 根据物理偏移量计算 CommitLog 文件的位置,读取对应的消息内容

  • 从 CommitLog 读取消息

    • 从 CommitLog 文件中读取消息内容,并校验消息的完整性

  • 返回消息给消费者

    • Broker 将读取到的消息内容返回给消费者

查询代码入口: org.apache.rocketmq.store.DefaultMessageStore#getMessage(java.lang.String, java.lang.String, int, long, int, int, org.apache.rocketmq.store.MessageFilter)

http://www.lryc.cn/news/533569.html

相关文章:

  • 如今物联网的快速发展对hmi的更新有哪些积极影响
  • linux 性能60秒分析
  • Redisson全面解析:从使用方法到工作原理的深度探索
  • neo4j-解决导入数据后出现:Database ‘xxxx‘ is unavailable. Run :sysinfo for more info.
  • 51单片机之引脚图(详解)
  • Hangfire.NET:.NET任务调度
  • 深入解析:React 事件处理的秘密与高效实践
  • 开源像素字体,可用于独立游戏开发
  • 【论文阅读】Comment on the Security of “VOSA“
  • 了解传输层TCP协议
  • flask实现用户名查重,重复的用户名阻止注册,以及如何优化
  • ASP.NET Core对JWT的封装
  • wordpressAI工具,已接入Deepseek 支持自动生成文章、生成图片、生成长尾关键词、前端AI窗口互动、批量采集等
  • Ollama部署 DeepSeek-R1:70B 模型的详细步骤
  • PAT乙级( 1009 说反话 1010 一元多项式求导)C语言版本超详细解析
  • 学习笔记十九:K8S生成pod过程
  • Qwen2-VL:增强视觉语言模型对世界任意分辨率的感知能力
  • 原神新版本角色牌上新 七圣召唤增添新玩法
  • Spring 中的 事务 隔离级别以及传播行为
  • 为多个GitHub账户配置SSH密钥
  • OSPF基础(3):区域划分
  • android studio无痛入门
  • 免费windows pdf编辑工具Epdf
  • CNN 卷积神经网络处理图片任务 | PyTorch 深度学习实战
  • LeetCode 128: 最长连续序列
  • 大语言模型需要的可观测性数据的关联方式
  • 【韩顺平linux】部分上课笔记整理
  • python调用pc的语音借口
  • 【Golang学习之旅】Golang 内存管理与 GC 机制详解
  • Kamailio 各个功能的共同点、不同点及应用场景