当前位置: 首页 > news >正文

kafka消费者组消费进度(Lag)深入理解

目录

    • 什么是消费者 Lag
      • 举例说明:
      • Lag 的意义:
    • Lag 监控和查询
      • kafka-consumer-groups
        • 基本语法
        • 常用命令示例
        • 1. 查看单个消费者组的详细信息(最常用)
        • 2. 列出所有消费者组(只显示名称)
        • 3. 列出所有消费者组(有详情信息,可以通过grep过滤topic和消费者组对应信息)
        • 4. 查看消费者组成员信息
      • Golang 代码实现 Lag 监控

什么是消费者 Lag

在消息队列系统(如 Kafka)中,消费者 Lag(也称为 “滞后量”) 是衡量消费者(或消费者组)处理消息进度的核心指标,它表示尚未被消费的消息数量。
具体来说,Lag 的计算方式是:

Lag = 分区当前最大偏移量(Max Offset) - 消费者已提交的偏移量(Committed Offset)

  • 最大偏移量(Max Offset):分区中最新一条消息的位置(即已经生产的消息总量标识)。
  • 已提交偏移量(Committed Offset):消费者组已经成功处理并提交的最新消息位置(即已经消费完成的进度标识)。

通常来说,Lag 的单位是消息数,而且我们一般是在主题这个级别上讨论 Lag 的,但实际上,Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值。

举例说明:

假设某个分区的消息偏移量是从 0 开始递增的:

  1. 目前分区中最新的消息偏移量是 100(即已生产了 101 条消息,0~100)。
  2. 消费者组已提交的偏移量是 80(即已处理完 0~80 的消息)。

此时,Lag = 100 - 80 = 20,意味着还有 20 条消息(81~100)未被消费。

Lag 的意义:

  • Lag = 0:表示消费者完全跟上了消息生产速度,没有未处理的消息。
  • Lag 增大:说明消费者处理速度慢于消息生产速度,出现了消息积压,可能导致业务延迟。
  • Lag 长期不为 0:可能是消费者能力不足、逻辑阻塞或系统异常的信号,需要排查。

Lag 监控和查询

kafka-consumer-groups

kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。

kafka-consumer-groups.sh 是 Kafka 自带的命令行工具,用于管理和查询消费者组(Consumer Group)的信息,包括消费进度(Lag)、位移(Offset)、成员信息等。它是排查消费问题的常用工具,适用于快速诊断消费者组状态。

基本语法
kafka-consumer-groups.sh --bootstrap-server <kafka-broker地址> [选项]

核心参数说明:

  • --bootstrap-server:指定 Kafka 集群的 broker 地址(如 localhost:9092 或多个地址用逗号分隔),必须指定
  • --group:指定消费者组名称(操作单个组时使用)。
  • --all-groups:操作所有消费者组(如查询所有组的信息)。
  • --describe:查看消费者组的详细信息(包括每个分区的位移和 Lag)。
常用命令示例
1. 查看单个消费者组的详细信息(最常用)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group

输出示例及解读:

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-consumer-group test-topic      0          80              100             20              consumer-1-abc123                              /192.168.1.1    consumer-1
my-consumer-group test-topic      1          50              50              0               consumer-2-def456                              /192.168.1.2    consumer-2

字段含义:

  • GROUP:消费者组名称。
  • TOPIC:消费的主题名称。
  • PARTITION:主题的分区编号。
  • CURRENT-OFFSET:消费者组已提交的位移(已处理到的位置)。
  • LOG-END-OFFSET:分区最新的消息位移(已生产的最新位置)。
  • LAG:未消费的消息数量(LOG-END-OFFSET - CURRENT-OFFSET)。
  • CONSUMER-ID/HOST/CLIENT-ID:当前消费该分区的消费者信息。
2. 列出所有消费者组(只显示名称)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

输出示例:

my-consumer-group
order-service-group
user-tracking-group
3. 列出所有消费者组(有详情信息,可以通过grep过滤topic和消费者组对应信息)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.37.10:9092   --all-groups --describe 2>/dev/null  | grep -v GROUP |awk '{size[$1" "$2]+=$6} END{for(i in size) if(size[i]>300) {print "    消费.对应Tpic "i,"的积压数为:"size[i]}}'

实现效果
在这里插入图片描述

4. 查看消费者组成员信息
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group my-consumer-group \--members

输出消费者组内的成员列表、分配的分区等信息,用于确认组内消费者是否正常在线。

Golang 代码实现 Lag 监控

填坑,待完善!

http://www.lryc.cn/news/598718.html

相关文章:

  • 【阿里云-ACP-1】疑难题解析
  • 力扣189:轮转数组
  • Linux基础服务(autofs和Samba)
  • 深圳三维扫描铸件形位公差尺寸测量3d偏差检测-中科米堆CASAIM
  • LeetCode 2322:从树中删除边的最小分数
  • Elasticsearch 的聚合(Aggregations)操作详解
  • multiprocessing 模块及其底层机制 spawn_main 在大模型应用中的场景
  • STM32-FSMC
  • multiprocessing模块使用方法(一)
  • S7-1500 与 ET200MP 的组态控制通信(Configuration Control)功能实现详解(上)
  • 设备虚拟化技术IRF
  • 力扣刷题(第九十七天)
  • 智慧驾驶疲劳检测算法的实时性优化
  • 「Linux命令基础」用户和用户组实训
  • 雷达使用的MSOP端口和DIFOP端口是什么意思
  • Spring-狂神说
  • Claude4、GPT4、Kimi K2、Gemini2.5、DeepSeek R1、Code Llama等2025主流AI编程大模型多维度对比分析报告
  • 【PZ-ZU7EV-KFB】——ZYNQ UltraScale + ZU7EV开发板ARM/FPGA异构计算开发平台,赋能多域智能硬件创新
  • python学习xlsx表格导入mysql脚本 + leetcode19删除链表倒N + python与本地mysql连接不上排错
  • 游戏开发Unity/ ShaderLab学习路径
  • rust-数据结构
  • 20250724-day21
  • Qt 调用ocx的详细步骤
  • 解决 SQL 错误 [1055]:深入理解 only_full_group_by 模式下的查询规范
  • R study notes[1]
  • 完成多项问题修复,MaxKB开源企业级智能体平台v1.10.9 LTS版本发布
  • C++图论全面解析:从基础概念到算法实践
  • 学习游戏制作记录(技能系统)7.24
  • Oracle国产化替代:一线DBA的技术决策突围战
  • 【ROS1】09-ROS通信机制——参数服务器