当前位置: 首页 > news >正文

【Note】《Kafka: The Definitive Guide》第三章: Kafka 生产者深入解析:如何高效写入 Kafka 消息队列

《Kafka: The Definitive Guide》第三章: Kafka 生产者深入解析:如何高效写入 Kafka 消息队列

Kafka 已经成为现代分布式系统中不可或缺的核心组件,尤其是在微服务、事件驱动架构与实时流处理领域。作为 Kafka 使用的第一步,生产者(Producer) 负责将消息写入 Kafka,这个过程背后有哪些关键机制?如何实现高可靠、高性能的写入?


什么是 Kafka Producer?

Kafka Producer 是 Kafka 客户端的一种,用于将消息以 Topic 的形式发送到 Kafka Broker 中的某个分区(Partition)
它是 Kafka 架构中数据输入的入口,也是连接上游系统与 Kafka 流平台的桥梁。


核心组成:写入一条消息发生了什么?

在你写入一条 Kafka 消息的背后,其实包含以下几个步骤:

1. 构造 ProducerRecord

这是 Kafka 中的基本写入单元。

new ProducerRecord<>("topic", "key", "value");
  • topic: 目标主题
  • key: 决定分区(可选)
  • value: 消息体

2. 消息序列化(Serialization)

Kafka 只接受字节数据,因此 keyvalue 需要先被序列化。Kafka 使用 Serializer 接口实现,比如最常用的:

key.serializer = org.apache.kafka.common.serialization.StringSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer

你也可以自定义 JSON、Avro、Protobuf 等序列化方式,增强结构化能力与跨语言支持。

3. 分区策略(Partitioner)

Kafka Producer 需要决定消息该写入哪个分区:

情况分区策略
指定 Key使用 Key 的哈希结果 mod 分区数
未指定 Key使用轮询方式(Round-Robin)均匀分配
自定义 Partitioner可根据业务逻辑自行决定分区

配置 Kafka Producer:这些参数你不能不懂

Kafka Producer 是高度可配置的,以下是常用配置项与建议:

参数默认值含义
bootstrap.serversKafka Broker 地址列表
acks1写入确认级别(见下)
retries0重试次数(发送失败)
batch.size16384单批消息最大大小(字节)
linger.ms0批消息等待时间(毫秒)
compression.typenone支持 gzip/snappy/lz4/zstd
buffer.memory33554432Producer 缓存总容量(字节)

acks 参数详解(可靠性核心)

含义适用场景
0不等待 Broker 响应极端低延迟需求
1等 Leader 副本确认兼顾性能和基本可靠性
all(或 -1等所有副本(ISR)确认强一致性,金融/交易等场景推荐

异步 vs 同步发送消息

Kafka Producer 支持两种发送方式:

1. 异步发送(推荐)

producer.send(record, (metadata, exception) -> {if (exception != null) {// 失败处理} else {// 成功记录}
});
  • 非阻塞;
  • 通过回调函数处理发送结果;
  • 吞吐高、延迟低。

2. 同步发送

producer.send(record).get();
  • 阻塞调用;
  • 等待 Kafka 确认再继续;
  • 适合控制节奏或事务要求强的场景。

性能优化策略

为了提升 Kafka Producer 的吞吐性能,可采取以下几种方式:

批量发送

使用 batch.size + linger.ms 控制:

  • batch.size 设置为 32KB~128KB;
  • linger.ms 设置为 5~100ms,可适当延迟发送以合并更多消息。

压缩

compression.type = lz4
  • lz4 压缩率高,CPU 消耗低;
  • 可显著减少带宽与磁盘使用,适合高并发写入场景。

控制重试机制

retries = 5
retry.backoff.ms = 100
  • 自动容错短暂的网络中断;
  • 建议配合幂等性开启,避免重复写入。

幂等性 & 事务支持(高级特性)

幂等性(Idempotence)

enable.idempotence = true
  • Kafka 会为 Producer 分配唯一 ID;
  • 同一条消息不会被重复写入;
  • 开启后自动启用强一致性机制,建议默认开启。

事务性 Producer(Exactly-once)

transactional.id = "my-transactional-producer"
  • 支持将多个写入操作视为一个事务提交;
  • 保证多个 topic 的写入原子性;
  • 常见于 Kafka Streams 或写入 Kafka + DB 的一致性控制。

总结:写好 Kafka Producer 的六个关键点

关键点建议
ProducerRecord明确 Topic、Key、Value,决定分区行为
序列化根据数据格式选择合适 Serializer
分区策略默认即可,自定义需慎重设计
写入可靠性acks=all + enable.idempotence=true 实现强一致性
性能优化利用批量、压缩、异步发送提升性能
事务控制需要 Exactly-once 可使用事务性 Producer

附:Kafka Producer 最小可用示例

Java版本

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("demo-topic", "key" + i, "value" + i);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.printf("Sent to partition %d with offset %d%n", metadata.partition(), metadata.offset());} else {exception.printStackTrace();}});
}producer.close();

C++版本

#include <iostream>
#include <string>
#include <csignal>
#include <librdkafka/rdkafkacpp.h>bool run = true;
void stop(int sig) { run = false; }class EventCb : public RdKafka::EventCb {
public:void event_cb(RdKafka::Event &event) override {if (event.type() == RdKafka::Event::EVENT_ERROR) {std::cerr << "Kafka Error: " << event.str() << std::endl;}}
};int main() {std::signal(SIGINT, stop);std::string brokers = "localhost:9092";std::string topic = "demo-topic";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);conf->set("bootstrap.servers", brokers, nullptr);conf->set("dr_cb", nullptr, nullptr); // delivery report callback 可选EventCb event_cb;conf->set("event_cb", &event_cb, nullptr);std::string errstr;RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "Producer creation failed: " << errstr << std::endl;return 1;}while (run) {std::string payload = "Hello Kafka from C++";RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,RdKafka::Producer::RK_MSG_COPY,const_cast<char *>(payload.c_str()), payload.size(),nullptr, nullptr);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;}producer->poll(0); // 触发回调std::this_thread::sleep_for(std::chrono::milliseconds(500));}producer->flush(1000);delete producer;delete conf;return 0;
}

http://www.lryc.cn/news/581575.html

相关文章:

  • Android studio在点击运行按钮时执行过程中输出的compileDebugKotlin 这个任务是由gradle执行的吗
  • 升级AGP(Android Gradle plugin)和gradle的版本可以提高kapt的执行速度吗
  • 【python】对纯二进制向量(仅包含 0 和 1,长度为 8 或 16)的检测和提取
  • 基于腾讯云开发与“人·事·财·物”架构理念的家政预约小程序设计与实现
  • 【Python练习】030. 编写一个函数,实现字符串的反转
  • Python 中 ffmpeg-python 库的详细使用
  • 一条 SQL 语句的内部执行流程详解(MySQL为例)
  • 2025 JuniorCryptCTF re 部分wp
  • 重力翻转者:原创趣味小游戏
  • 前端开发常见问题(从布局到性能优化)
  • 【libm】 10 rem_pio2函数 (rem_pio2.rs)
  • 人工智能之数学基础:线性回归算法的矩阵参数求导
  • 传统微商困境与开源链动2+1模式、AI智能名片及S2B2C商城小程序的转型破局
  • AUTOSAR进阶图解==>AUTOSAR_SWS_V2XFacilities
  • Hadoop MapReduce 入门
  • Hadoop高可用集群搭建
  • k8s-服务发布基础
  • 小菜狗的云计算之旅,学习了解rsync+sersync实现数据实时同步(详细操作步骤)
  • 【Linux网络编程】Socket - UDP
  • 儿童趣味记忆配对游戏
  • 【CSS-15】深入理解CSS transition-duration:掌握过渡动画的时长控制
  • Java的各种各样的生命周期——思考历程
  • 字符函数和字符串函数(下)- 暴力匹配算法
  • ASP.NET Web Pages 安装使用教程
  • 随机森林算法详解:Bagging思想的代表算法
  • 【大模型入门】访问GPT_API实战案例
  • 8.2.1+8.2.2插入排序
  • 企业智脑:智能营销新纪元——自动化品牌建设与智能化营销的技术革命
  • 【Linux操作系统 | 第12篇】Linux磁盘分区
  • Dubbo 3.x源码(31)—Dubbo消息的编码解码