当前位置: 首页 > news >正文

kafka 消费组 分区分配策略

一、前提

kafka的版本是 2.6.2
一般我们消费kafka的时候是指定消费组,是不会指定消费组内部消费kafka各个分区的分配策略,但是我们也可以指定消费策略,通过源码发现,我们可以有三种分区策略:

  • RangeAssignor (默认)
  • RoundRobinAssignor
  • StickyAssignor

指定消费分区策略

 props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

kafka消费分区策略的分区入口类是:ConsumerCoordinatorperformAssignment方法

    @Overrideprotected Map<String, ByteBuffer> performAssignment(String leaderId,String assignmentStrategy,List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {//获取分区策略ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);//存储消费组订阅的所有topicSet<String> allSubscribedTopics = new HashSet<>();//存储消费组内各个消费者对应的基本信息(比如元数据)Map<String, Subscription> subscriptions = new HashMap<>();Map<String, List<TopicPartition>> ownedPartitions = new HashMap<>();for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));subscriptions.put(memberSubscription.memberId(), subscription);allSubscribedTopics.addAll(subscription.topics());ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());}//具体实现在类 AbstractPartitionAssignor (各个分区算法的抽象类)Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();...log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);...return groupAssignment;}

AbstractPartitionAssignorassign()

	//各个分区策略具体的算法public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions);@Overridepublic GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();Set<String> allSubscribedTopics = new HashSet<>();for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());Map<String, Integer> partitionsPerTopic = new HashMap<>();for (String topic : allSubscribedTopics) {Integer numPartitions = metadata.partitionCountForTopic(topic);if (numPartitions != null && numPartitions > 0)partitionsPerTopic.put(topic, numPartitions);elselog.debug("Skipping assignment for topic {} since no metadata is available", topic);}/构建参数 partitionsPerTopic:map,表示各个topic有多少个分区//subscriptions :map,表示消费者相关信息(消费者id,消费者对应的主题)Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, subscriptions);// this class maintains no user data, so just wrap the resultsMap<String, Assignment> assignments = new HashMap<>();for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));return new GroupAssignment(assignments);}

下面说明下RangeAssignorRoundRobinAssignor两种分区策略的区别

二、RangeAssignor 分区策略

RangeAssignor是默认分配的策略

public class RangeAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "range";}private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());for (String topic : subscriptionEntry.getValue().topics()) {put(topicToConsumers, topic, memberInfo);}}return topicToConsumers;}@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {//获取主题对应的消费者列表//partitionsPerTopic 主题对应分区个数//subscriptions 消费者的信息(消费者id,消费者对应的主题,消费者实例)Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);//打印输出,可以看到消费组group-one有两个消费者 consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 和  consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd//其中: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd 消费了 test_topic_partition_one 和 test_topic_partition_two//  consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 只消费了 test_topic_partition_one// consumersPerTopic: {test_topic_partition_one=[MemberInfo [member.id: consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3, group.instance.id: {}], MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]], test_topic_partition_two=[MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]]}Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<>());for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {//获取topicString topic = topicEntry.getKey();//获取topic对应的消费者List<MemberInfo> consumersForTopic = topicEntry.getValue();//获取topic的分区数Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;Collections.sort(consumersForTopic);//计算每个消费者至少消费几个分区int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();//计算剩余几个分区int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();//获取主题分区列表List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);//可以看到前面的消费者会多分配一个分区int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);//计算每个消费者对应的分区列表,可以看到前面的消费者会多分配一个分区assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));}}return assignment;}
}

举例说明:构建消费组下两个消费者, test_topic_partition_onetest_topic_partition_two都是9个分区
进程一:

        props.put("group.id", "group-one");props.put("auto.offset.reset", "latest");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test_topic_partition_one", "test_topic_partition_two"));

进程二:

        props.put("group.id", "group-one");props.put("auto.offset.reset", "latest");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test_topic_partition_one"));

通过上面的分配算法可以得到:
消费者:consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd消费的分区为:

test_topic_partition_one-0, 
test_topic_partition_one-1,
test_topic_partition_one-2, 
test_topic_partition_one-3, 
test_topic_partition_one-4,
test_topic_partition_two-0, 
test_topic_partition_two-1, 
test_topic_partition_two-2, 
test_topic_partition_two-3, 
test_topic_partition_two-4, 
test_topic_partition_two-5, 
test_topic_partition_two-6, 
test_topic_partition_two-7, 
test_topic_partition_two-8

消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为:

test_topic_partition_one-5, 
test_topic_partition_one-6, 
test_topic_partition_one-7, 
test_topic_partition_one-8

如果进程二也消费两个主题,则对应的关系变成
通过上面的分配算法可以得到:
消费者:consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd消费的分区为:

test_topic_partition_one-0, 
test_topic_partition_one-1,
test_topic_partition_one-2, 
test_topic_partition_one-3, 
test_topic_partition_one-4,
test_topic_partition_two-0, 
test_topic_partition_two-1, 
test_topic_partition_two-2, 
test_topic_partition_two-3, 
test_topic_partition_two-4,

消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为:

test_topic_partition_one-5, 
test_topic_partition_one-6, 
test_topic_partition_one-7, 
test_topic_partition_one-8,
test_topic_partition_two-5, 
test_topic_partition_two-6, 
test_topic_partition_two-7, 
test_topic_partition_two-8

可以看到第一个消费者比第二个消费者多消费一个test_topic_partition_one的分区,而且是连续的。同时可以看到分类是按照topic粒度区分的,也就是每个消费者消费一个topic的分区与其他topic是无关的。可以会导致第一个实例运行压力较大的问题。

三、RoundRobinAssignor 分区策略

public class RoundRobinAssignor extends AbstractPartitionAssignor {@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> assignment = new HashMap<>();//存储消费组下所有的消费者,构建两个消费者// 其中一个:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5// 另一个:consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855aList<MemberInfo> memberInfoList = new ArrayList<>();for (Map.Entry<String, Subscription> memberSubscription : subscriptions.entrySet()) {assignment.put(memberSubscription.getKey(), new ArrayList<>());memberInfoList.add(new MemberInfo(memberSubscription.getKey(),memberSubscription.getValue().groupInstanceId()));}//排序后的消费者CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic = partition.topic();//轮询指定消费者的分区while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic)) {assigner.next();}assignment.get(assigner.next().memberId).add(partition);}return assignment;}//获取排序后的所有主题分区private List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {SortedSet<String> topics = new TreeSet<>();for (Subscription subscription : subscriptions.values())topics.addAll(subscription.topics());List<TopicPartition> allPartitions = new ArrayList<>();for (String topic : topics) {Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic != null)allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));}return allPartitions;}@Overridepublic String name() {return "roundrobin";}
}

举例说明:构建消费组下两个消费者, test_topic_partition_onetest_topic_partition_two都是9个分区
进程一:

        props.put("group.id", "group-one");props.put("auto.offset.reset", "latest");//指定轮询策略props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test_topic_partition_one", "test_topic_partition_two"));
        props.put("group.id", "group-one");props.put("auto.offset.reset", "latest");//指定轮询策略props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test_topic_partition_one"));

通过上面的分配算法可以得到:
消费者:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5消费的分区为:

test_topic_partition_one-0, 
test_topic_partition_one-2,
test_topic_partition_one-4, 
test_topic_partition_one-6, 
test_topic_partition_one-8,
test_topic_partition_two-0, 
test_topic_partition_two-1, 
test_topic_partition_two-2, 
test_topic_partition_two-3, 
test_topic_partition_two-4, 
test_topic_partition_two-5, 
test_topic_partition_two-6, 
test_topic_partition_two-7, 
test_topic_partition_two-8

消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为:

test_topic_partition_one-1, 
test_topic_partition_one-3, 
test_topic_partition_one-5, 
test_topic_partition_one-7

可以看到test_topic_partition_one分区是轮流的分配给两个消费者的

对应的日志

2024-08-19 14:28:34 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  Line:626] [Consumer clientId=consumer-group-one-1, groupId=group-one] Finished assignment for group at generation 44: {consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5=Assignment(partitions=[test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8]), consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855a=Assignment(partitions=[test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7])}

如果进程二也消费两个主题,则对应的关系变成
消费者:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5消费的分区为:

test_topic_partition_one-0, 
test_topic_partition_one-2,
test_topic_partition_one-4, 
test_topic_partition_one-6, 
test_topic_partition_one-8,
test_topic_partition_two-1, 
test_topic_partition_two-3, 
test_topic_partition_two-5, 
test_topic_partition_two-7

消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为:

test_topic_partition_one-1, 
test_topic_partition_one-3, 
test_topic_partition_one-5, 
test_topic_partition_one-7
test_topic_partition_two-0, 
test_topic_partition_two-2, 
test_topic_partition_two-4, 
test_topic_partition_two-6, 
test_topic_partition_two-8

也就是会把所有的分区轮流分给两个消费者,所以这种模式就和主题个数与主题分区有关了。

http://www.lryc.cn/news/427998.html

相关文章:

  • AQS原理解析
  • 『 Linux 』利用UDP套接字实现简单群聊
  • 【数据结构与算法 | 图篇】最小生成树之Kruskal(克鲁斯卡尔)算法
  • 了解常用的代码检查工具
  • BUUCTF PWN wp--warmup_csaw_2016
  • dockerfile搭建部署LNMP
  • Rust : 数据分析利器polars用法
  • Qt第一课
  • 论“graphics.h”库,easyx
  • 如何在寂静中用电脑找回失踪的手机?远程控制了解一下
  • Android 实现动态换行显示的 TextView 列表
  • Golang | Leetcode Golang题解之第352题将数据流变为多个不相交区间
  • Ubuntu安装mysql 以及远程连接mysql Windows—适合初学者的讲解(详细)
  • 【数学建模】MATLAB快速入门
  • 【ubuntu24.04】k8s 部署5:配置calico 镜像拉取
  • Elasticsearch 的数据备份与恢复
  • Ps:首选项 - 暂存盘
  • 力扣217题详解:存在重复元素的多种解法与复杂度分析
  • 享元模式:轻量级对象共享,高效利用内存
  • 人工智能-自然语言处理(NLP)
  • 基于UE5和ROS2的激光雷达+深度RGBD相机小车的仿真指南(三)---创建自定义激光雷达Componet组件
  • C++ 设计模式——策略模式
  • 【书生大模型实战营(暑假场)闯关材料】基础岛:第3关 浦语提示词工程实践
  • C++ | Leetcode C++题解之第350题两个数组的交集II
  • 遗传算法原理与实战(python、matlab)
  • 《黑神话:悟空》媒体评分解禁 M站均分82
  • 安卓中携程和线程的区别。携程是指什么?
  • 部署flannel网络(master服务器执行)遇到错误
  • 超越IP-Adapter!阿里提出UniPortrait,可通过文本定制生成高保真的单人或多人图像。
  • 使用托管竞价实例在Amazon SageMaker上运行机器学习训练