Kafka-exporter采集参数调整方案
#作者:张桐瑞
文章目录
- 1 问题概述
- 2 修改方案
- 2.1修改参数
- 2.2配置示例
- 3 消费者组均分脚本
- 3.1使用说明
- 3.2脚本内容
- 3.3实现原理说明
- 4 KAFKA-EXPORTER流程代码
- 4.1KAFKA-EXPORTER拉取数据流程
1 问题概述
由于kafka-exporter获取kafka指标时间过长,无法通过curl kafka-exporter:9308/metrics 获取指标,通过查看kafka-exporter日志,发现
time=“2025-07-22T02:18:00Z” level=error msg=“Cannot get offset of group xxxx: kafka: broker not connected” source=“kafka_exporter.go:396”
的报错信息,发现kafka-exporter在查询消费者组时出现超时,通过命令
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
同样出现超时信息。
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConsumerGroups, deadlineMs=1753170317381, tries=1, nextAllowedTryMs=1753170317482) timed out at 1753170317382 after 1 attempt(s)
通过针对具体消费者组进行查看,发现返回信息较快,故计划过滤一部分groups来减少数据量。
2 修改方案
当前环境中,Kafka与Kafka Exporter共同部署在同一Pod 内,且各个Kafka Exporter未配置过滤规则,导致采集数据存在大量重复。为提升采集效率和准确性,计划通过以下措施进行优化:
- 基于不同的 ReplicaSet(RC)配置差异化过滤规则;
针对每个RC单独设置专属的消费者组过滤规则,避免重复采集。 - 利用消费者均分脚本生成过滤正则表达式;
通过脚本将所有消费者组按规则均分,生成适用于Kafka Exporter的正则表达式,保证每个 Exporter 只采集其分配范围内的消费者组。 - 将生成的消费者组过滤正则应用于不同 Exporter 配置中;
将对应的正则表达式配置到各自 Exporter 实例中,实现消费组采集的隔离和均衡分布
2.1修改参数
group.filter .* Regex that determines which consumer groups to collect
group.filter # 正则表达式,用于确定要收集哪些消费者组(Consumer Groups)的指标。
2.2配置示例
只需为每个Kafka Exporter添加对应的YAML配置段落即可。
2.2.1KAFKA_EXPORTER_1:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^(A|a|B).*
2.2.2KAFKA_EXPORTER_2:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^(E|b).*
2.2.3KAFKA_EXPORTER_3:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^©.*
3 消费者组均分脚本
3.1使用说明
配置参数:
- BOOTSTRAP_SERVER: 指定 Kafka 服务地址;
- EXPORTER_COUNT: 指定需要生成多少个 Exporter 的过滤规则。
# Bash group-distributor.sh使用消费者组均分脚本进行消费者组均匀分布,脚本执行结果如下:
bash-5.0$ bash /bitnami/kafka/data/topic_to_consumer_group.sh================ Hash by 首字母 (区分大小写) =================
Exporter 1: --group.filter=^(A|a|B).* [3 groups]
Exporter 2: --group.filter=^(E|b).* [2 groups]
Exporter 3: --group.filter=^(C).* [1 groups]
可参考对应的exporter过滤正则结果。
3.2脚本内容
3.2.1Hash分组脚本
#!/bin/bashBOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3# 获取所有消费者组
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)# 初始化数组
for ((i=0; i<EXPORTER_COUNT; i++)); doprefix_sets[$i]=""count[$i]=0
donedeclare -A seen_prefix# 遍历组名,按首字母 hash 均分
while IFS= read -r group; doprefix=$(echo "$group" | cut -c1)[[ -z "$prefix" ]] && continue# 计算哈希分配位置hash_hex=$(echo -n "$prefix" | md5sum | awk '{print $1}' | cut -c1-8)hash_dec=$((16#$hash_hex))idx=$((hash_dec % EXPORTER_COUNT))# 记录唯一首字母,用于生成正则if [[ -z "${seen_prefix[$prefix]}" ]]; thenseen_prefix[$prefix]=1if [[ -z "${prefix_sets[$idx]}" ]]; thenprefix_sets[$idx]="$prefix"elseprefix_sets[$idx]+="|$prefix"fifi# 每个实际组都会计入对应 exporter 的数量count[$idx]=$((count[$idx] + 1))
done <<< "$all_groups"# 输出结果
echo "================ Hash by 首字母 (区分大小写) ================="
for ((i=0; i<EXPORTER_COUNT; i++)); doif [[ -n "${prefix_sets[$i]}" ]]; thenecho "Exporter $((i+1)): --group.filter='^(${prefix_sets[$i]}).*' [${count[$i]} groups]"elseecho "Exporter $((i+1)): --group.filter='^()' [${count[$i]} groups]"fi
done
3.2.2 排序分组脚本
#!/bin/bashBOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3# 获取所有消费组
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)# 按首字母分组(区分大小写)
declare -A initial_to_groups
declare -A initial_countswhile IFS= read -r group; do[[ -z "$group" ]] && continuefirst_char=${group:0:1}initial_to_groups["$first_char"]+="$group"$'\n'initial_counts["$first_char"]=$((initial_counts["$first_char"] + 1))
done <<< "$all_groups"# 将首字母按消费组数量降序排序
sorted_initials=$(for k in "${!initial_counts[@]}"; doecho -e "${initial_counts[$k]}\t$k"
done | sort -rn | awk '{print $2}')# 初始化每个 exporter 的负载、过滤项和组列表
for ((i=0; i<EXPORTER_COUNT; i++)); doexporter_filters[$i]=""exporter_loads[$i]=0exporter_groups[$i]=""
done# 分配首字母到 exporter,按消费组数量均衡
for initial in $sorted_initials; docount=${initial_counts[$initial]}# 找当前负载最小的 exportermin_index=0min_load=${exporter_loads[0]}for ((i=1; i<EXPORTER_COUNT; i++)); doif (( exporter_loads[i] < min_load )); thenmin_index=$imin_load=${exporter_loads[i]}fidone# 添加到对应 exporterif [[ -z "${exporter_filters[$min_index]}" ]]; thenexporter_filters[$min_index]="$initial"elseexporter_filters[$min_index]+="|$initial"fiexporter_loads[$min_index]=$((exporter_loads[$min_index] + count))exporter_groups[$min_index]+="${initial_to_groups[$initial]}"
done# 输出格式
echo "================ kafka-exporter group.filter 正则分片 ================="
for ((i=0; i<EXPORTER_COUNT; i++)); dogroup_count=$(echo -n "${exporter_groups[$i]}" | grep -c '^')echo "Exporter $((i+1)): ($group_count groups)"echo " --group.filter='^(${exporter_filters[$i]}).*'"echo
done
3.3实现原理说明
3.3.1Hash分组脚本说明
该脚本通过以下核心逻辑实现消费者组的分配与正则表达式生成:
- 获取消费者组列表:
利用 kafka-consumer-groups.sh --list 获取当前 Kafka 所有消费者组名。 - 提取首字母并去重:
对每个消费者组名提取首字母(区分大小写),使用关联数组 seen_prefix 确保相同首字母只处理一次。 - Hash 均分逻辑:
1)使用 md5sum 对首字母进行 Hash 处理;
2)将 Hash 值转为十进制,再通过取模操作将其平均分配到 N 个 Exporter 中;
3)每个 Exporter 收集分配到的首字母集合,组合成正则表达式。 - 生成过滤规则:
每个Exporter输出一条形如–group.filter=^(A|B|C).* 的正则规则,仅匹配以指定前缀开头的消费者组。
3.3.2排序分钟脚本说明
在首字母提取之后,脚本还会对首字母列表进行 排序处理,其作用是: - 确保稳定性
通过排序(sort 命令),可以保证即使 Kafka 集群中的消费者组顺序发生变化,脚本对首字母的处理顺序仍保持一致,从而保证正则分配的稳定性。 - 提升分配公平性
排序后的首字母经过统一的 Hash 计算和取模分配,有助于在 Exporter间更公平地分摊消费者组,提高采集性能和均衡性。 - 简化调试和查看
将字母排序输出后,便于在查看分配结果时快速定位具体字母属于哪个Exporter,也有助于问题定位和正则表达式核查。
4 KAFKA-EXPORTER流程代码
4.1KAFKA-EXPORTER拉取数据流程
4.1.1拉取所有消费者组
在getConsumerGroupMetrics函数中,首先通过broker.ListGroups(&sarama.ListGroupsRequest{})向Kafka broker拉取所有的消费者组(group)列表。
这一步会返回集群中所有存在的group名称。
groupIds := make([]string, 0)for groupId := range groups.Groups {if e.groupFilter.MatchString(groupId) {groupIds = append(groupIds, groupId)}}
4.1.2 过滤消费者组
拉取到所有group后,遍历每个groupId,用e.groupFilter.MatchString(groupId)判断该group是否匹配过滤条件。
只有匹配的groupId才会被加入到后续的处理流程。
groupIds := make([]string, 0)for groupId := range groups.Groups {if e.groupFilter.MatchString(groupId) {groupIds = append(groupIds, groupId)}}
4.1.3 DESCRIBE消费者组
只对通过过滤的group进行DescribeGroups、FetchOffset等详细指标采集。
describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds})if err != nil {glog.Errorf("Cannot get describe groups: %v", err)return}for _, group := range describeGroups.Groups {offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}if e.offsetShowAll {for topic, partitions := range offset {for partition := range partitions {offsetFetchRequest.AddPartition(topic, partition)}}