当前位置: 首页 > news >正文

Kafka生产者的粘性分区算法

分区算法分类

kafka在生产者投递消息时,会根据是否有key采取不用策略来获取分区。

存在key时会根据key计算一个hash值,然后采用hash%分区数的方式获取对应的分区。

而不存在key时采用随机算法选取分区,然后将所有的消息封装到这个batch上直到达到限定数量,然后才发送出去。

如下图,6条消息采用key可能分三次发送到三个不同的分区,需要3次网络请求。如果没有key将封住成一个批次发送。这样一次网路请求就可以发送多条消息,大大提高了效率。

源码分析

producer根据keyBytes是否有值采用不同的分区策略。有key的计算hash % numPartitions得到分区。

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);}// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}

 并且kafka在这里做了缓存,如果第一次获取到了粘性分区后面会缓存起来。 

 public int partition(String topic, Cluster cluster) {Integer part = indexCache.get(topic);if (part == null) {return nextPartition(topic, cluster, -1);}return part;}

没有key的采用stickyPartitionCache的策略,这里是分区算法的主要代码。获取所有的availablePartitions,然后如果availablePartitions大于1,获取一个随机数random,然后通过random % availablePartitions.size()的方式获取分区。

      List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() < 1) {Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart = random % partitions.size();} else if (availablePartitions.size() == 1) {newPart = availablePartitions.get(0).partition();} else {while (newPart == null || newPart.equals(oldPart)) {int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart = availablePartitions.get(random % availablePartitions.size()).partition();}}

abortForNewBatch表示需要发送到新的批次,然后调用onNewBatch获取新的分区。

      if (result.abortForNewBatch) {int prevPartition = partition;partitioner.onNewBatch(record.topic(), cluster, prevPartition);partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);...public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}

在下一个批次发送时会检测是否和上一个分区相同,如果相同将会缓存一个新的分区。

        // Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed.if (oldPart == null || oldPart == prevPartition) {

总结

为了提升kafka发送消息的速率,在对消息顺序没有特殊的要求情况下,应该尽量避免设置消息的key,这样可以提交发送消息的吞吐量。

http://www.lryc.cn/news/31283.html

相关文章:

  • java基础篇
  • Java与Winform进行AES加解密数据传输的工具类与对应关系和示例
  • OpenAI模型的API调用与使用-测试(2)
  • 【LeetCode】剑指 Offer 22. 链表中倒数第k个节点 p136 -- Java Version
  • 经典卷积模型回顾7-轻量化模型MobileNet实现图像分类(matlab)
  • 程序员压力大?用 PyQt 做一个美*女GIF设置桌面,每天都有好心情
  • Shell命令——sed命令
  • C语言练习 | 初学者经典练习汇(2)
  • git分支
  • Java每天15道面试题 | redisII
  • 浏览器渲染原理
  • 华为OD机试题 - 查找单入口空闲区域(JavaScript)| 含思路
  • 制造型企业想要做好数字化改造,要注意以下几点!
  • 【蓝桥杯集训·每日一题】AcWing 1488. 最短距离
  • 比亚迪:全球最大电动汽车制造商的坎坷成长之路
  • Java开发 - Quartz初体验
  • 无头盔开发vr XR Device Simulator操作(更新)
  • 《C++代码分析》第二回:函数重载const char* ,char*,const char[],char[]汇编代码上的区别
  • 【学习笔记】深入理解JVM之垃圾回收机制
  • 49.在ROS中实现local planner(2)- 实现Purepersuit(纯跟踪)算法
  • Allegro如何设通孔Pin和Via的消盘操作指导
  • Android工厂模式
  • 神经网络硬件加速器-架构篇
  • Python raise用法(超级详细,看了无师自通)
  • 1.SpringSecurity快速入门
  • Graph Partition: Edge cut and Vertex cut
  • Javascript周学习小结(初识,变量,数据类型)
  • C语言-基础了解-10-C函数
  • 【LeetCode】剑指 Offer(16)
  • 第三十九章 linux-并发解决方法二(互斥锁mutex)