当前位置: 首页 > news >正文

Kafka【十四】生产者发送消息时的消息分区策略

【1】分区策略

Kafka中Topic是对数据逻辑上的分类,而Partition才是数据真正存储的物理位置。所以在生产数据时,如果只是指定Topic的名称,其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时,也可以指定数据存储的分区编号。

在这里插入图片描述
指定分区传递数据是没有任何问题的。Kafka会进行基本简单的校验,比如是否为空,是否小于0之类的。但是你的分区是否存在就无法判断了,所以需要从Kafka中获取集群元数据信息,此时会因为长时间获取不到元数据信息而出现超时异常。所以如果不能确定分区编号范围的情况,不指定分区还是一个不错的选择。

如果不指定分区,Kafka会根据集群元数据中的主题分区来通过算法来计算分区编号并设定:

(1) 如果指定了分区,直接使用

(2) 如果指定了自己的分区器,通过分区器计算分区编号,如果有效,直接使用

(3) 如果指定了数据Key,且使用Key选择分区的场合,采用murmur2非加密散列算法(类似于hash)计算数据Key序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号
在这里插入图片描述

(4) 如果未指定数据Key,或不使用Key选择分区,那么Kafka会采用优化后的粘性分区策略进行分区选择:

  • 没有分区数据加载状态信息时,会从分区列表中随机选择一个分区。在这里插入图片描述
  • 如果存在分区数据加载状态信息时,
    • 根据分区数据队列加载状态,通过随机数获取一个权重值。
    • 根据这个权重值在队列加载状态中进行二分查找法,查找权重值的索引值。
    • 将这个索引值加1就是当前设定的分区。

增加数据后,会根据当前粘性分区中生产的数据量进行判断,是不是需要切换其他的分区。判断标准就是大于等于批次大小(16K)的2倍,或大于一个批次大小(16K)且需要切换。如果满足条件,下一条数据就会放置到其他分区。

【2】分区器

在某些场合中,指定的数据我们是需要根据自身的业务逻辑发往指定的分区的。所以需要自己定义分区编号规则,而不是采用Kafka自动设置。Kafka早期版本中提供了两个分区器,不过在当前kafka版本中已经不推荐使用了。
在这里插入图片描述

自定义分区器

首先我们需要创建一个类,然后实现Kafka提供的分区类接口Partitioner,接下来重写方法。这里我们只关注partition方法即可,因为此方法的返回结果就是需要的分区编号。

/*** TODO 自定义分区器实现步骤:*      1. 实现Partitioner接口*      2. 重写方法*         partition : 返回分区编号,从0开始*         close*         configure*/
public class KafkaPartitionerMock implements Partitioner {/*** 分区算法 - 根据业务自行定义即可* @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes The serialized key to partition on( or null if no key)* @param value The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster The current cluster metadata* @return 分区编号,从0开始*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

配置分区器

public class ProducerPartitionTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 这里配置自定义分区器configMap.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartitionerMock.class.getName());KafkaProducer<String, String> producer = null;try {producer = new KafkaProducer<>(configMap);for ( int i = 0; i < 1; i++ ) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);final Future<RecordMetadata> send = producer.send(record, new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) {if ( e != null ) {e.printStackTrace();} else {System.out.println("数据发送成功:" + record.key() + "," + record.value());}}});}} catch ( Exception e ) {e.printStackTrace();} finally {if ( producer != null ) {producer.close();}}}
}
http://www.lryc.cn/news/437043.html

相关文章:

  • SQL优化:执行计划详细分析
  • Android Studio -> Android Studio 获取release模式和debug模式的APK
  • 基于 SpringBoot 的实习管理系统
  • vmware workstation 17 linux版
  • Windows环境本地部署Oracle 19c及卸载实操手册
  • MapStruct介绍
  • 35天学习小结
  • 【iOS】UIViewController的生命周期
  • ELK在Linux服务器下使用docker快速部署(超详细)
  • unity导入半透明webm + AE合成半透明视频
  • 力扣: 四数相加II
  • 径向基函数神经网络RBFNN案例实操
  • Java-数据结构-二叉树-习题(一) (✪ω✪)
  • js 时间戳转日期格式
  • 基于人工智能的自动驾驶系统项目教学指南
  • [Linux#49][UDP] 2w字详解 | socketaddr | 常用API | 实操:实现简易Udp传输
  • 期权组合策略有什么风险?期权组合策略是什么?
  • 从Zotero6到Zotero7的数据迁移尝试?(有错勿喷,多多指教!)
  • 快速排序(分治思想)
  • JAVA相关知识
  • 详解TCP的三次握手
  • Java面试篇基础部分-Java创建线程详解
  • Ubuntu 20.04/22.04无法连接网络(网络图标丢失、找不到网卡)的解决方案
  • 《MDTv2- Masked Diffusion Transformer is a Strong Image Synthesizer》
  • 算法 - 二分查找
  • Python知识点:如何使用Python进行图像批处理
  • 数据结构实验1
  • 使用Postman+JMeter进行简单的接口测试
  • 基于 SpringBoot 的车辆充电桩管理系统
  • centos7.9安装clamav教程