当前位置: 首页 > news >正文

Kafka-副本分配策略

一、上下文

《Kafka-创建topic源码》我们大致分析了topic创建的流程,为了保持它的完整性和清晰度。细节并没有展开分析。下面我们就来分析下副本的分配策略以及副本中的leader角色的确定逻辑。当有了副本分配策略,才会得到分区对应的broker,才可以在topic目录下写入对应的数据。Controller端才可以让这些分区和副本上线去提供服务。副本的leader角色确定后才能使producer生产的数据知道第一个写入的broker节点是哪个?以及follower的同步工作。

二、目的

1、将副本均匀地分布在broker之间

2、对于分配给特定broker的分区,其其他副本分布在其他broker上

3、如果所有broker都有机架信息,请尽可能将每个分区的副本分配给不同的机架

如果不考虑机架的情况下也要实现副本分配的目标,我们的做法是这样的

1、从broker列表中的随机位置开始,通过循环分配每个分区的第一个副本

2、以递增的移位分配每个分区的剩余副本

三、示例

1、场景描述

假如一个topic有6个分区(0, 1, 2, 3, 4, 5)且副本因子为3,对应的集群情况如下图:

每个机架对应的brokerid如下:

机架brokerId列表
rack10,5
rack23,4
rack31,2

获取机架交错的broker列表:(0, 3, 1, 5, 4, 2)

有了这个列表就课可以以简单的循环方式将副本分配给broker,确保每个broker上的leader和follower数量均匀分布,并将副本分配到所有机架。

2、分配结果

分区副本所在的brokerId列表
00,3,1
13,1,5
21,5,4
35,4,2
44,2,0
52,0,3

机架感知分配总是使用机架交替broker列表上的轮询来选择分区的第一个副本。对于其余的副本,它将偏向于机架上没有任何副本分配的broker,直到每个机架都有一个副本。然后,任务将回到broker 列表上的循环。

因此,如果副本的数量 >= 机架的数量,它将确保每个机架至少获得一个副本。否则,每个机架最多只能获得一个副本。在副本数量与机架数量相同并且每个机架具有相同数量的代理的完美情况下,它保证了副本在broker和机架之间的分布是均匀的。

此时如果再增加一个分区(6分区)呢?按照规律分配的副本所在的broker列表应该是0,3,1。但如果这样就违背了目的3。一旦它完成了第一轮循环,如果有更多的分区要分配,算法将开始转移follower 。这是为了确保我们不会总是得到相同的序列集。因此分区6可能分配到的副本broker列表为0,4,2。

四、源码

    //fixedStartIndex =-1 , startPartitionId = -1private static Map<Integer, List<Integer>> assignReplicasToBrokersRackAware(int nPartitions,int replicationFactor,Collection<BrokerMetadata> brokerMetadatas,int fixedStartIndex,int startPartitionId) {//存放broker和机架的对应关系Map<Integer, String> brokerRackMap = new HashMap<>();//获取broker和机架的对应关系brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.get()));//机架数量int numRacks = new HashSet<>(brokerRackMap.values()).size();//获取交替机架的broker列表List<Integer> arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap);//broker 数量int numBrokers = arrangedBrokerList.size();Map<Integer, List<Integer>> ret = new HashMap<>();// fixedStartIndex 的初始值是 -1 ,因此 startIndex = 一个 0 至 broker 数量 之间的随机整数int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size());// startPartitionId 的初始值是 -1 ,因此 currentPartitionId = 0int currentPartitionId = Math.max(0, startPartitionId);//下一个要分配的副本 ,第一次应该是 一个 0 至 broker 数量 之间的随机整数int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size());//循环分区列表,对每个分区进行副本分配for (int i = 0; i < nPartitions; i++) {if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size() == 0))nextReplicaShift += 1;//第一个副本索引int firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size();//默认将第一个副本 作为leaderint leader = arrangedBrokerList.get(firstReplicaIndex);List<Integer> replicaBuffer = new ArrayList<>();replicaBuffer.add(leader);Set<String> racksWithReplicas = new HashSet<>();racksWithReplicas.add(brokerRackMap.get(leader));Set<Integer> brokersWithReplicas = new HashSet<>();brokersWithReplicas.add(leader);//根据副本因子,进行副本的分配,因为有了leader,因此只用循环处理 replicationFactor - 1 的甚于副本分配int k = 0;for (int j = 0; j < replicationFactor - 1; j++) {boolean done = false;while (!done) {Integer broker = arrangedBrokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size()));String rack = brokerRackMap.get(broker);// 跳过这个broker ,如果满足以下2个条件中的1个//1、同一机架中已经有一个broker分配了副本,并且有一个或多个机架没有任何副本,或者//2、broker已经分配了副本,但有一个或多个broker没有分配副本if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size() == numRacks)&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size() == numBrokers)) {replicaBuffer.add(broker);racksWithReplicas.add(rack);brokersWithReplicas.add(broker);done = true;}k += 1;}}//返回分区对应的副本的broker列表ret.put(currentPartitionId, replicaBuffer);currentPartitionId += 1;}return ret;}
http://www.lryc.cn/news/489727.html

相关文章:

  • 市场波动不断,如何自我提高交易心理韧性?
  • 加速科技精彩亮相中国国际半导体博览会IC China 2024
  • 利用c语言详细介绍下选择排序
  • 华为流程L1-L6业务流程深度细化到可执行
  • bridge-multicast-igmpsnooping
  • git使用(一)
  • Linux环境安装MongoDB
  • Cyberchef使用功能之-多种压缩/解压缩操作对比
  • TypeScript 装饰器都有那些应用场景?如何更快的上手?
  • 堆优化版本的Prim
  • Ubuntu上安装MySQL并且实现远程登录
  • 蓝桥杯每日真题 - 第21天
  • (长期更新)《零基础入门 ArcGIS(ArcMap) 》实验一(下)----空间数据的编辑与处理(超超超详细!!!)
  • NLP论文速读(CVPR 2024)|使用DPO进行diffusion模型对齐
  • 操作系统——揭开盖子
  • 如何在 React 项目中应用 TypeScript?应该注意那些点?结合实际项目示例及代码进行讲解!
  • C++学习第四天
  • 【从零开始的LeetCode-算法】3232. 判断是否可以赢得数字游戏
  • 一种简单高效的RTSP流在线检测方法,不需要再过渡拉流就可以获取设备状态以及对应音视频通道与编码格式
  • 24/11/22 项目拆解 艺术风格转移
  • 数字赋能,气象引领 | 气象景观数字化服务平台重塑京城旅游生态
  • 关于Redux的学习(包括Redux-toolkit中间件)
  • 【无人机】
  • Zabbix7.0.6的容器镜像准备
  • 利用 GitHub 和 Hexo 搭建个人博客【保姆教程】
  • React第四节 组件的三大属性之state
  • MongoDB进阶篇-索引(索引概述、索引的类型、索引相关操作、索引的使用)
  • 使用FFmpeg实现视频与GIF的画中画效果
  • 车载信息安全框架 --- 车载信息安全相关事宜
  • Unreal5从入门到精通之EnhancedInput增强输入系统详解