kafka的shell操作
Kafka 提供了丰富的 shell 命令工具,位于 Kafka 安装目录的 bin/
目录下(Windows 系统为 bin/windows/
)。这些命令用于管理主题、生产者、消费者、分区等核心组件。以下是常用的 Kafka shell 操作大全:
一、主题(Topic)操作
1. 创建主题
# 创建一个名为 test-topic 的主题,3个分区,2个副本
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--create \--topic test-topic \--partitions 3 \--replication-factor 2
--bootstrap-server
:指定 Kafka 集群地址(broker 列表)--partitions
:分区数量(提高并行度)--replication-factor
:副本数量(提高可用性,不能超过 broker 数量)
2. 查看所有主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--list
3. 查看主题详情
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--describe \--topic test-topic
输出包含分区数、副本分布、ISR(同步副本)等信息。
4. 修改主题(仅支持分区数增加)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--alter \--topic test-topic \--partitions 5 # 只能增加,不能减少
5. 删除主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--delete \--topic test-topic
- 需确保
server.properties
中delete.topic.enable=true
(默认开启)
二、生产者(Producer)操作
1. 启动控制台生产者
# 向 test-topic 发送消息(输入内容后按回车发送)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \--topic test-topic
2. 带键值对的生产者
# 发送格式为 "key:value" 的消息(需指定分隔符)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \--topic test-topic \--property "parse.key=true" \--property "key.separator=:"
输入示例:user1:hello
(key 为 user1,value 为 hello)
三、消费者(Consumer)操作
1. 启动控制台消费者(从最新消息开始消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic
2. 从最早消息开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic \--from-beginning
3. 消费指定分区的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic \--partition 0 # 消费第0个分区
4. 带消费组的消费者
# 加入名为 test-group 的消费组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test-topic \--group test-group
四、消费组(Consumer Group)操作
1. 查看所有消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--list
2. 查看消费组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe \--group test-group
输出包含:
- 消费组 ID
- 主题名称
- 分区分配情况
- 当前消费偏移量(CURRENT-OFFSET)
- 日志末尾偏移量(LOG-END-OFFSET)
- 未消费消息数(LAG)
3. 重置消费组偏移量
# 将 test-group 对 test-topic 的偏移量重置为最早
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--group test-group \--topic test-topic \--reset-offsets \--to-earliest \--execute
其他重置选项:
--to-latest
:重置到最新--to-offset <offset>
:重置到指定偏移量--shift-by <number>
:相对当前偏移量移动(正数向前,负数向后)
五、分区(Partition)操作
1. 查看分区 Leader 分布
# 结合主题详情查看
bin/kafka-topics.sh --bootstrap-server localhost:9092 \--describe \--topic test-topic | grep "Leader:"
六、其他常用操作
1. 查看 broker 元数据
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
2. 查看主题消息数量(近似值)
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \--bootstrap-server localhost:9092 \--topic test-topic \--time -1 # -1 表示最新偏移量,-2 表示最早偏移量
计算总消息数:各分区最新偏移量之和。
七、Windows 系统注意事项
- 所有
.sh
命令替换为.bat
(如kafka-topics.bat
) - 路径分隔符使用
\
而非/
以上命令覆盖了 Kafka 日常运维的核心场景,实际使用时需根据集群地址(--bootstrap-server
)和具体需求调整参数。