当前位置: 首页 > news >正文

Kafka 典型问题与排查以及相关优化

Kafka 是一个高吞吐量的分布式消息系统,但在实际应用中,用户经常会遇到一些性能问题和消息堆积的问题。本文将介绍 Kafka 中一些典型问题的原因和排查方法,帮助用户解决问题并优化 Kafka 集群的性能。

一、Topic 消息发送慢,并发性能低

问题描述:

某个或某几个 Topic 的消息并发发送性能低,具体表现为 Producer 的平均请求延迟大,平均生产吞吐量低。

可能原因:

  1. 网络带宽不足,导致 IO 等待时间长。
  2. 消息未压缩,导致网络流量超负荷。
  3. 消息未批量发送或批量阈值配置不当,导致发送速率慢。
  4. Topic 分区数量不足,导致 Broker 接收消息积压。
  5. Broker 磁盘性能低,导致磁盘同步慢。
  6. Broker 分区总量过多,导致碎片化,磁盘读写过载。

排查方法: 

1、确认网络带宽:

检查 Producer 的平均 IO 等待时间指标,判断 Producer 到 Broker 之间的网络带宽是否满足业务的流量要求。

在一个节点上作为服务器启动 iperf
iperf -s在另一个节点上作为客户端测试网络带宽
iperf -c 192.168.10.21

2、确认消息压缩:

检查 Producer 的平均压缩率指标,确保压缩率符合预期。

在 Producer 配置中启用消息压缩
compression.type=gzip

3、消息未批量发送或批量阈值配置不当

 检查 Producer 的批量发送配置 batch.sizelinger.ms

操作命令: 查看 Producer 配置文件或代码,确认 batch.sizelinger.ms 设置。

优化方法: 调整 Producer 配置,增大批量发送大小和延迟时间:

batch.size=32768
linger.ms=10

4、增加 Topic 分区:

使用 Kafka 提供的命令行工具查看 Topic 分区数量

kafka-topics.sh --describe --topic <your_topic> --zookeeper <zookeeper_host>:2181

优化方法增加 Topic 的分区数量以提高并发处理能力:

kafka-topics.sh --alter --topic <your_topic> --partitions 10 --zookeeper <zookeeper_host>:2181

5、检查磁盘 IO 使用率:

确认 Broker 磁盘 IO 使用率是否在安全范围内,若使用率较高则考虑扩容 Broker 或增加 Topic 分区数。

 使用 iostatdstat 命令查看磁盘 IO 使用率:

iostat -x 1 10
dstat -d 1

优化方法

  • 升级磁盘为 SSD,提高磁盘性能。
  • 优化磁盘配置,确保磁盘 IO 性能最佳。

6、检查分区总量:

查看集群的总分区数和单个 Broker 的分区数量,确保在规划的容量范围内。

使用 Kafka 提供的命令行工具查看集群分区情况: 

kafka-topics.sh --describe --zookeeper <zookeeper_host>:2181

优化方法

  • 水平扩展 Broker,增加 Broker 数量以分散分区负载。
  • 通过增加 Broker 节点来均衡分区分布,减少单个 Broker 的分区数量。

二、Topic 消息堆积

问题描述:
某个或某几个 Topic 的消息堆积持续增加,具体表现为 Group 消费延迟数量持续增加。

可能原因:

  1. Producer 生产消息流量增大。
  2. Consumer 由于业务变化导致消费延迟增加。
  3. Consumer 数量不足。
  4. Consumer 数量频繁变化,导致 Group 不断做再平衡(Rebalance)。
  5. Broker 未收到 Consumer 的消息确认消息。

排查方法:

确认生产量:

检查 Producer 的消息生产量指标,判断是否明显增加。

确认消费量:

检查 Consumer 的消息流量指标,判断是否明显下降

检查 Consumer 数量:

通过 Kafka Broker 提供的命令,确认 Topic 对应的 Consumer 数量与实际的 Consumer 数量是否一致。如果不一致,说明某些 Consumer 未正确连接到 Broker,需要排查 Consumer 是否正常运行。

kafka-consumer-groups.sh --describe --group your_consumer_group --bootstrap-server broker_host:9092

观察 Consumer 变化:

观察 Consumer 的数量是否频繁变化而触发再平衡。由于网络或其他原因,可能导致 Consumer 与 Broker 之间的连接不稳定,Consumer 能持续消费消息,但 Broker 始终认为消息未确认,导致消费位点不变。此时可能需要确认 Consumer 与 Broker 之间的网络稳定性,甚至重启 Consumer。

三、优化 Kafka 生产性能

优化方法:

1、增大 Producer 发送消息的批量大小(batch.size)和批量发送等待时间(linger.ms)。

batch.size=32768
linger.ms=10

2、启用压缩

使用压缩算法减少网络带宽占用。

compression.type=gzip

3、增加分区数:

通过增加 Topic 的分区数,提高并发写入能力。

4、调整 Broker 配置

优化 Broker 的磁盘配置和 IO 设置,例如调整 log.segment.bytes 和 log.retention.hours。

四、优化 Kafka 消费性能

优化方法:

1、增大消费并发:

增加 Consumer 数量和分区数,提升并发消费能力。

2、优化消费逻辑:

确保 Consumer 的业务逻辑高效,减少单个消息处理时间。

3、平衡负载:

确保 Consumer Group 的每个 Consumer 都能均匀分配到分区。

五、Kafka 集群扩展与维护

扩展方法:

1、水平扩展 Broker:

增加 Broker 数量,均衡负载,提高集群容量。

(1)安装新Broker:

在新的服务器上安装Kafka,步骤与之前的安装步骤相同,确保Java已经安装。

(2)配置新 Broker:

编辑server.properties文件,设置新的broker.idlog.dirs,确保 zookeeper.connect指向现有的 Zookeeper集群。

broker.id=3  # 新的 Broker ID
listeners=PLAINTEXT://:9092
log.dirs=/data/kafka-logs
zookeeper.connect=192.168.10.20:2181,192.168.10.21:2181,192.168.10.22:2181
num.partitions=3
default.replication.factor=3

(3)启动新Broker

sudo su - kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

 (4)重新分配分区:

使用Kafka自带的工具将现有分区重新分配到新 Broker 上。

生成当前分区副本分配:

kafka-reassign-partitions.sh --zookeeper 192.168.10.20:2181 --generate --topics-to-move-json-file topics.json --broker-list "0,1,2,3"

topics.json内容:

{"version": 1,"topics": [{"topic": "test-topic"}]
}

 执行分区重新分配:

kafka-reassign-partitions.sh --zookeeper 192.168.10.20:2181 --execute --reassignment-json-file reassignment.json

2、动态扩展 Topic:

根据流量需求动态调整 Topic 的分区数。

 增加 Topic 的分区数量

kafka-topics.sh --alter --topic test-topic --partitions 10 --zookeeper 192.168.10.20:2181

维护方法:

定期监控:

使用 Kafka 自带的工具或第三方监控工具(如 Prometheus + Grafana)定期监控集群状态。

检查集群状态: 

kafka-broker-api-versions.sh --bootstrap-server 192.168.10.20:9092

检查 Topic 信息: 

kafka-topics.sh --describe --topic test-topic --zookeeper 192.168.10.20:2181

定期优化:

定期检查并优化配置,清理过期数据,保持集群健康运行。

优化server.properties中的配置,根据实际使用情况调整参数,如调整日志段大小和保留时间:

log.segment.bytes=1073741824  # 1 GB
log.retention.hours=168       # 7 days

清理过期数据

kafka-delete-records.sh --bootstrap-server 192.168.10.20:9092 --offset-json-file offsets.json

http://www.lryc.cn/news/396204.html

相关文章:

  • C# 策略模式(Strategy Pattern)
  • 【初阶数据结构】1.算法复杂度
  • (图文详解)小程序AppID申请以及在Hbuilderx中运行
  • 科技创新引领水利行业升级:深入分析智慧水利解决方案的核心价值,展望其在未来水资源管理中的重要地位与作用
  • ExcelVBA运用Excel的【条件格式】(三)
  • coco数据集格式计算mAP的python脚本
  • Linux学习——Linux中无法使用ifconfg命令
  • 二分查找3
  • 从零开始学习嵌入式----C语言框架梳理与后期规划
  • ESP32 步进电机精准控制:打造高精度 DIY 写字机器人,实现流畅书写体验
  • 传知代码-图神经网络长对话理解(论文复现)
  • 部署前端项目
  • 使用POI实现Excel文件的读取(超详细)
  • Debezium系列之:记录一次数据库某张表部分数据未同步到hive表的原因
  • 爆破器材期刊
  • Nginx Websocket 协议配置支持
  • 【生成式对抗网络】GANs在数据生成、艺术创作,以及在增强现实和虚拟现实中的应用
  • 大模型面试(三)
  • pycharm中快捷键汇总
  • TCP/IP协议族结构和协议
  • 大模型一些概念的理解 - 线性层、前向传播、后向传播
  • AWS 云安全性:检测 SSH 暴力攻击
  • 7.9数据结构
  • Python 文件操作:打开数据处理的大门
  • 单对以太网连接器多场景应用
  • Python pip的更新问题
  • [Linux][Shell][Shell基础] -- [Shebang][特殊符号][变量][父子Shell]详细讲解
  • DS200CVMAG1AEB处理器 控制器 模块
  • 阈值分割后配合Connection算子和箭头工具快速知道区域的ID并选择指定区域
  • 【work】AI八股-神经网络相关