当前位置: 首页 > news >正文

6.RocketMQ之消费索引文件ConsumeQueue

功能:作为CommitLog文件的索引文件。
在这里插入图片描述
本文着重分析为consumequeue/topic/queueId目录下的索引文件。

1.ConsumeQueueStore

public class ConsumeQueueStore {protected final ConcurrentMap<String>, ConcurrentMap<Integer>, ConsumeQueueInterface>> consumeQueueTable;public boolean load() {String storePathRootDir = this.messageStoreConfig.getStorePathRootDir();String storePathConsumeQueue = getStorePathConsumeQueue(storePathRootDir);boolean cqLoadResult = loadConsumeQueues(storePathConsumeQueue, CQType.SimpleCQ);String storePathBatchConsumeQueue = getStorePathBatchConsumeQueue(storePathRootDir);boolean bcqLoadResult = loadConsumeQueues(storePathBatchConsumeQueue, CQType.BatchCQ);return cqLoadResult && bcqLoadResult;}//Broker启动后加载本地的consumequeue文件private boolean loadConsumeQueues(String storePath, CQType cqType) {File dirLogic = new File(storePath);File[] fileTopicList = dirLogic.listFiles();if (fileTopicList != null) {for (File fileTopic : fileTopicList) {String topic = fileTopic.getName();File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {for (File fileQueueId : fileQueueIdList) {int queueId = Integer.parseInt(fileQueueId.getName());;queueTypeShouldBe(topic, cqType);//选择 ConsumeQueue or BatchConsumeQueue 本文以 ConsumeQueue 作为分析案例ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);this.putConsumeQueue(topic, queueId, logic);if (!this.load(logic)) {return false;}}}}}return true;}private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);if (null == map) {map = new ConcurrentHashMap<>();map.put(queueId, consumeQueue);this.consumeQueueTable.put(topic, map);} else {map.put(queueId, consumeQueue);}}public boolean load(ConsumeQueueInterface consumeQueue) {// 通过 topic & queueId 从consumeQueueTable 获取到 对应的FileQueueLifeCycle 即ConsumeQueueFileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());return fileQueueLifeCycle.load();}
}

1.1.ConsumeQueue

public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {private final MappedFileQueue mappedFileQueue;@Overridepublic boolean load() {boolean result = this.mappedFileQueue.load();return result;}
}

1.2.MappedFileQueue

mappedFileQueue.load核心功能就是加载consumequeue/topic/queueId目录下的消费索引本地文件。区别CommitLog加载的是/commitlog目录下真正的用户数据。
ConsumeQueue & CommitLog 均持有属性类MappedFileQueue【mmap零拷贝之内存映射的磁盘文件】。

DefaultMessageStore#ReputMessageService

CommitLog & ConsumerQueue 目录下的所有问题在Broker端启动的时候默认都会加载到内存中建立与磁盘之间的映射关系。但是在CommitLog不断增加数据过程中,ConsumerQueue是如何确认每条消息的索引文件呢?

http://www.lryc.cn/news/132952.html

相关文章:

  • Appium-移动端自动测试框架,如何入门?
  • 复数混频器、零中频架构和高级算法开发
  • Web 拦截器-interceptor
  • Java进阶(4)——结合类加载JVM的过程理解创建对象的几种方式:new,反射Class,克隆clone(拷贝),序列化反序列化
  • 扩散模型实战(四):从零构建扩散模型
  • YOLOv5、YOLOv8改进:S2注意力机制
  • LeetCode 542. 01 Matrix【多源BFS】中等
  • 使用open cv进行角度测量
  • java 线程池实现多线程处理list数据
  • Centos安装Docker
  • Unity启动项目无反应的解决
  • 2.3 opensbi: riscv: opensbi源码解析
  • 点破ResNet残差网络的精髓
  • Ubuntu服务器service版本初始化
  • re学习(33)攻防世界-secret-galaxy-300(脑洞题)
  • Mybatis Plus中使用LambdaQueryWrapper进行分页以及模糊查询对比传统XML方式进行分页
  • vue中push和resolve的区别
  • 详解RFC 3550文档-1
  • Go 与 Rust
  • Android Studio实现读取本地相册文件并展示
  • python的全局解释锁(GIL)
  • 小程序swiper一个轮播显示一个半内容且实现无缝滚动
  • 【自然语言处理】关系抽取 —— SimpleRE 讲解
  • 【O2O领域】Axure外卖订餐骑手端APP原型图,外卖众包配送原型设计图
  • DataGridView keydown事件无法在C#中工作
  • 【ElasticSearch】一键安装ElasticSearch与Kibana以及解决遇到的问题
  • 电商数据采集和数据分析
  • react 11之 router6路由 (两种路由模式、两种路由跳转、两种传参与接收参数、嵌套路由,layout组件、路由懒加载)
  • Golang 基础语法问答
  • 冠达管理:哪里查中报预增?