当前位置: 首页 > news >正文

6.RocketMQ之索引文件ConsumeQueue

在这里插入图片描述
本文着重分析为consumequeue/topic/queueId目录下的索引文件。

1.ConsumeQueueStore

public class ConsumeQueueStore {protected final ConcurrentMap<String>, ConcurrentMap<Integer>, ConsumeQueueInterface>> consumeQueueTable;public boolean load() {String storePathRootDir = this.messageStoreConfig.getStorePathRootDir();String storePathConsumeQueue = getStorePathConsumeQueue(storePathRootDir);boolean cqLoadResult = loadConsumeQueues(storePathConsumeQueue, CQType.SimpleCQ);String storePathBatchConsumeQueue = getStorePathBatchConsumeQueue(storePathRootDir);boolean bcqLoadResult = loadConsumeQueues(storePathBatchConsumeQueue, CQType.BatchCQ);return cqLoadResult && bcqLoadResult;}//Broker启动后加载本地的consumequeue文件private boolean loadConsumeQueues(String storePath, CQType cqType) {File dirLogic = new File(storePath);File[] fileTopicList = dirLogic.listFiles();if (fileTopicList != null) {for (File fileTopic : fileTopicList) {String topic = fileTopic.getName();File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {for (File fileQueueId : fileQueueIdList) {int queueId = Integer.parseInt(fileQueueId.getName());;queueTypeShouldBe(topic, cqType);//选择 ConsumeQueue or BatchConsumeQueue 本文以 ConsumeQueue 作为分析案例ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);this.putConsumeQueue(topic, queueId, logic);if (!this.load(logic)) {return false;}}}}}return true;}private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);if (null == map) {map = new ConcurrentHashMap<>();map.put(queueId, consumeQueue);this.consumeQueueTable.put(topic, map);} else {map.put(queueId, consumeQueue);}}public boolean load(ConsumeQueueInterface consumeQueue) {// 通过 topic & queueId 从consumeQueueTable 获取到 对应的FileQueueLifeCycle 即ConsumeQueueFileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());return fileQueueLifeCycle.load();}
}

1.1.ConsumeQueue

public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {private final MappedFileQueue mappedFileQueue;@Overridepublic boolean load() {boolean result = this.mappedFileQueue.load();return result;}
}

1.2.MappedFileQueue

mappedFileQueue.load核心功能就是加载consumequeue/topic/queueId目录下的消费索引本地文件。区别CommitLog加载的是/commitlog目录下真正的用户数据。
ConsumeQueue & CommitLog 均持有属性类MappedFileQueue【mmap零拷贝之内存映射的磁盘文件】。

DefaultMessageStore#ReputMessageService

CommitLog & ConsumerQueue 目录下的所有问题在Broker端启动的时候默认都会加载到内存中建立与磁盘之间的映射关系。但是在CommitLog不断增加数据过程中,ConsumerQueue是如何确认每条消息的索引文件呢?

http://www.lryc.cn/news/129971.html

相关文章:

  • 【C++学习手札】一文带你认识C++虚继承​​
  • 神经网络基础-神经网络补充概念-63-残差网络
  • 【从0开始学架构笔记】01 基础架构
  • vue3+ts+vite使用el-breadcrumb实现面包屑组件,实现面包屑过渡动画
  • 【Java 动态数据统计图】动态数据统计思路案例(动态,排序,数组)四(116)
  • Chrome命令行开关
  • 元宇宙赛道加速破圈 和数软件抓住“元宇宙游戏”发展新风口
  • Vue的鼠标键盘事件
  • Bytebase 2.6.0 - ​支持通过 LDAP 配置 SSO,支持 RisingWave 数据库
  • C# 读取pcd、ply点云文件数据
  • LeetCode1387 将整数按权重排序
  • 正则表达式--Intellij IDEA常用的替换
  • 前端如何安全的渲染HTML字符串?
  • C++学习第十四天----for循环
  • 快速解决在进入浏览器时,明明连接了网络,但是显示你尚未连接,代理服务器可能有问题。
  • TypeScript入门指南
  • excel中定位条件,excel中有哪些数据类型、excel常见错误值、查找与替换
  • 19c_ogg搭建
  • 网络通信原理网络层TCP/IP协议(第四十三课)
  • yolov5封装进ros系统
  • Flowable 源码目录结构
  • 科大讯飞星火模型申请与chatgpt 3.5模型以及new bing的对比
  • 无涯教程-TensorFlow - 分布式计算
  • python+django+mysql项目实践五(信息搜索)
  • Python Opencv实践 - 图像透射变换
  • SpringBoot + Vue 微人事项目(第二天)
  • 【AIGC】 快速体验Stable Diffusion
  • Python入门【动态添加属性和方法、正则表达式概述、match函数的使用、常用匹配符、限定符 、限定符使用示例】(二十九)
  • 《Go 语言第一课》课程学习笔记(四)
  • 制定建立商务模式财务及企业管理信息系统的解决方案