当前位置: 首页 > news >正文

Kafka-exporter采集参数调整方案

#作者:张桐瑞

文章目录

  • 1 问题概述
  • 2 修改方案
    • 2.1修改参数
    • 2.2配置示例
  • 3 消费者组均分脚本
    • 3.1使用说明
    • 3.2脚本内容
    • 3.3实现原理说明
  • 4 KAFKA-EXPORTER流程代码
    • 4.1KAFKA-EXPORTER拉取数据流程

1 问题概述

由于kafka-exporter获取kafka指标时间过长,无法通过curl kafka-exporter:9308/metrics 获取指标,通过查看kafka-exporter日志,发现
time=“2025-07-22T02:18:00Z” level=error msg=“Cannot get offset of group xxxx: kafka: broker not connected” source=“kafka_exporter.go:396”
的报错信息,发现kafka-exporter在查询消费者组时出现超时,通过命令
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
同样出现超时信息。
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConsumerGroups, deadlineMs=1753170317381, tries=1, nextAllowedTryMs=1753170317482) timed out at 1753170317382 after 1 attempt(s)
通过针对具体消费者组进行查看,发现返回信息较快,故计划过滤一部分groups来减少数据量。

2 修改方案

当前环境中,Kafka与Kafka Exporter共同部署在同一Pod 内,且各个Kafka Exporter未配置过滤规则,导致采集数据存在大量重复。为提升采集效率和准确性,计划通过以下措施进行优化:

  1. 基于不同的 ReplicaSet(RC)配置差异化过滤规则;
    针对每个RC单独设置专属的消费者组过滤规则,避免重复采集。
  2. 利用消费者均分脚本生成过滤正则表达式;
    通过脚本将所有消费者组按规则均分,生成适用于Kafka Exporter的正则表达式,保证每个 Exporter 只采集其分配范围内的消费者组。
  3. 将生成的消费者组过滤正则应用于不同 Exporter 配置中;
    将对应的正则表达式配置到各自 Exporter 实例中,实现消费组采集的隔离和均衡分布

2.1修改参数

group.filter .* Regex that determines which consumer groups to collect
group.filter # 正则表达式,用于确定要收集哪些消费者组(Consumer Groups)的指标。

2.2配置示例

只需为每个Kafka Exporter添加对应的YAML配置段落即可。

2.2.1KAFKA_EXPORTER_1:

  • args:
    • –kafka.server=localhost:9092
    • –web.listen-address=:9308
    • –group.filter=^(A|a|B).*

2.2.2KAFKA_EXPORTER_2:

  • args:
    • –kafka.server=localhost:9092
    • –web.listen-address=:9308
    • –group.filter=^(E|b).*

2.2.3KAFKA_EXPORTER_3:

  • args:
    • –kafka.server=localhost:9092
    • –web.listen-address=:9308
    • –group.filter=^©.*

3 消费者组均分脚本

3.1使用说明

配置参数:

  • BOOTSTRAP_SERVER: 指定 Kafka 服务地址;
  • EXPORTER_COUNT: 指定需要生成多少个 Exporter 的过滤规则。
# Bash group-distributor.sh使用消费者组均分脚本进行消费者组均匀分布,脚本执行结果如下:
bash-5.0$ bash /bitnami/kafka/data/topic_to_consumer_group.sh================ Hash by 首字母 (区分大小写) =================
Exporter 1: --group.filter=^(A|a|B).*  [3 groups]
Exporter 2: --group.filter=^(E|b).*  [2 groups]
Exporter 3: --group.filter=^(C).*  [1 groups]

可参考对应的exporter过滤正则结果。

3.2脚本内容

3.2.1Hash分组脚本

#!/bin/bashBOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3# 获取所有消费者组
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)# 初始化数组
for ((i=0; i<EXPORTER_COUNT; i++)); doprefix_sets[$i]=""count[$i]=0
donedeclare -A seen_prefix# 遍历组名,按首字母 hash 均分
while IFS= read -r group; doprefix=$(echo "$group" | cut -c1)[[ -z "$prefix" ]] && continue# 计算哈希分配位置hash_hex=$(echo -n "$prefix" | md5sum | awk '{print $1}' | cut -c1-8)hash_dec=$((16#$hash_hex))idx=$((hash_dec % EXPORTER_COUNT))# 记录唯一首字母,用于生成正则if [[ -z "${seen_prefix[$prefix]}" ]]; thenseen_prefix[$prefix]=1if [[ -z "${prefix_sets[$idx]}" ]]; thenprefix_sets[$idx]="$prefix"elseprefix_sets[$idx]+="|$prefix"fifi# 每个实际组都会计入对应 exporter 的数量count[$idx]=$((count[$idx] + 1))
done <<< "$all_groups"# 输出结果
echo "================ Hash by 首字母 (区分大小写) ================="
for ((i=0; i<EXPORTER_COUNT; i++)); doif [[ -n "${prefix_sets[$i]}" ]]; thenecho "Exporter $((i+1)): --group.filter='^(${prefix_sets[$i]}).*'  [${count[$i]} groups]"elseecho "Exporter $((i+1)): --group.filter='^()'  [${count[$i]} groups]"fi
done

3.2.2 排序分组脚本

#!/bin/bashBOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3# 获取所有消费组
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)# 按首字母分组(区分大小写)
declare -A initial_to_groups
declare -A initial_countswhile IFS= read -r group; do[[ -z "$group" ]] && continuefirst_char=${group:0:1}initial_to_groups["$first_char"]+="$group"$'\n'initial_counts["$first_char"]=$((initial_counts["$first_char"] + 1))
done <<< "$all_groups"# 将首字母按消费组数量降序排序
sorted_initials=$(for k in "${!initial_counts[@]}"; doecho -e "${initial_counts[$k]}\t$k"
done | sort -rn | awk '{print $2}')# 初始化每个 exporter 的负载、过滤项和组列表
for ((i=0; i<EXPORTER_COUNT; i++)); doexporter_filters[$i]=""exporter_loads[$i]=0exporter_groups[$i]=""
done# 分配首字母到 exporter,按消费组数量均衡
for initial in $sorted_initials; docount=${initial_counts[$initial]}# 找当前负载最小的 exportermin_index=0min_load=${exporter_loads[0]}for ((i=1; i<EXPORTER_COUNT; i++)); doif (( exporter_loads[i] < min_load )); thenmin_index=$imin_load=${exporter_loads[i]}fidone# 添加到对应 exporterif [[ -z "${exporter_filters[$min_index]}" ]]; thenexporter_filters[$min_index]="$initial"elseexporter_filters[$min_index]+="|$initial"fiexporter_loads[$min_index]=$((exporter_loads[$min_index] + count))exporter_groups[$min_index]+="${initial_to_groups[$initial]}"
done# 输出格式
echo "================ kafka-exporter group.filter 正则分片 ================="
for ((i=0; i<EXPORTER_COUNT; i++)); dogroup_count=$(echo -n "${exporter_groups[$i]}" | grep -c '^')echo "Exporter $((i+1)): ($group_count groups)"echo "  --group.filter='^(${exporter_filters[$i]}).*'"echo
done

3.3实现原理说明

3.3.1Hash分组脚本说明
该脚本通过以下核心逻辑实现消费者组的分配与正则表达式生成:

  1. 获取消费者组列表:
    利用 kafka-consumer-groups.sh --list 获取当前 Kafka 所有消费者组名。
  2. 提取首字母并去重:
    对每个消费者组名提取首字母(区分大小写),使用关联数组 seen_prefix 确保相同首字母只处理一次。
  3. Hash 均分逻辑:
    1)使用 md5sum 对首字母进行 Hash 处理;
    2)将 Hash 值转为十进制,再通过取模操作将其平均分配到 N 个 Exporter 中;
    3)每个 Exporter 收集分配到的首字母集合,组合成正则表达式。
  4. 生成过滤规则:
    每个Exporter输出一条形如–group.filter=^(A|B|C).* 的正则规则,仅匹配以指定前缀开头的消费者组。
    3.3.2排序分钟脚本说明
    在首字母提取之后,脚本还会对首字母列表进行 排序处理,其作用是:
  5. 确保稳定性
    通过排序(sort 命令),可以保证即使 Kafka 集群中的消费者组顺序发生变化,脚本对首字母的处理顺序仍保持一致,从而保证正则分配的稳定性。
  6. 提升分配公平性
    排序后的首字母经过统一的 Hash 计算和取模分配,有助于在 Exporter间更公平地分摊消费者组,提高采集性能和均衡性。
  7. 简化调试和查看
    将字母排序输出后,便于在查看分配结果时快速定位具体字母属于哪个Exporter,也有助于问题定位和正则表达式核查。

4 KAFKA-EXPORTER流程代码

4.1KAFKA-EXPORTER拉取数据流程

4.1.1拉取所有消费者组
在getConsumerGroupMetrics函数中,首先通过broker.ListGroups(&sarama.ListGroupsRequest{})向Kafka broker拉取所有的消费者组(group)列表。
这一步会返回集群中所有存在的group名称。

groupIds := make([]string, 0)for groupId := range groups.Groups {if e.groupFilter.MatchString(groupId) {groupIds = append(groupIds, groupId)}}

4.1.2 过滤消费者组
拉取到所有group后,遍历每个groupId,用e.groupFilter.MatchString(groupId)判断该group是否匹配过滤条件。
只有匹配的groupId才会被加入到后续的处理流程。

 groupIds := make([]string, 0)for groupId := range groups.Groups {if e.groupFilter.MatchString(groupId) {groupIds = append(groupIds, groupId)}}

4.1.3 DESCRIBE消费者组
只对通过过滤的group进行DescribeGroups、FetchOffset等详细指标采集。

describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds})if err != nil {glog.Errorf("Cannot get describe groups: %v", err)return}for _, group := range describeGroups.Groups {offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}if e.offsetShowAll {for topic, partitions := range offset {for partition := range partitions {offsetFetchRequest.AddPartition(topic, partition)}}
http://www.lryc.cn/news/612383.html

相关文章:

  • android NDK 报错日志解读和还原报错方法名
  • 数语科技登陆华为云商店,助力企业释放数据潜能
  • FPGA学习笔记——VGA彩条显示
  • 【软考系统架构设计师备考笔记4】 - 英语语法一篇通
  • 机器人定位装配的精度革命:迁移科技如何重塑工业生产价值
  • Spring Boot 参数校验全指南
  • 应急响应linux
  • vue3+element-plus,el-popover实现筛选弹窗的方法
  • ACL 2025 Oral|Evaluation Agent:面向视觉生成模型的高效可提示的评估框架
  • 【关于Java的对象】
  • C++、STL面试题总结(二)
  • Android14的QS面板的加载解析
  • 【android bluetooth 协议分析 03】【蓝牙扫描详解 4】【BR/EDR扫描到设备后如何上报给app侧】
  • 力扣137:只出现一次的数字Ⅱ
  • 【计算机网络 | 第4篇】分组交换
  • Javascript/ES6+/Typescript重点内容篇——手撕(待总结)
  • Spring之【初识AOP】
  • hf的国内平替hf-mirror
  • IAR软件中测量函数执行时间
  • 开发笔记 | 接口与抽象基类说明以及对象池的实现
  • 机器学习 朴素贝叶斯
  • 【普通地质学】地球的物质组成
  • iOS混淆工具有哪些?在集成第三方 SDK 时的混淆策略与工具建议
  • MEMS陀螺仪如何在复杂井下环境中保持精准测量?
  • 以此芯p1芯片为例研究OpenHarmony上GPU (Vulkan) 加速在深度学习推理中的价值
  • n8n Slack credentials(n8n slack凭证配置步骤)(API access token)
  • Datawhale AI 夏令营:RAG多模态检索(Baseline解读)
  • 解决启动docker报错Cannot connect to the Docker daemon问题
  • Windows 如何上架 iOS 应用?签名上传全流程 + 工具推荐
  • 使用CRC32爆破ZIP压缩包内小文件内容的技术解析