当前位置: 首页 > news >正文

Kafka 原理与核心机制全解析

一、Kafka 是什么?

简要介绍:

  • Kafka 是一个高吞吐、分布式、可扩展的消息系统

  • 用于日志聚合、实时流处理、事件驱动架构等场景

其主要设计目标如下:

  1. 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能;
  2. 高吞吐率。即使在非常廉价的机器上也能做到单机支持每秒100K条消息的传输;
  3. 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输,同时支持离线数据处理和实时数据处理。

为什么要用消息系统

Kafka 本质上是一个 MQ (Message Queue),使用消息队列的好处

  1. 解耦(Decoupling:允许我们独立修改队列两边的处理过程而互不影响。
  2. 冗余(Redundancy:有些情况下,我们在处理数据的过程会失败造成数据丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险,确保你的数据被安全的保存直到你使用完毕
  3. 峰值处理能力(Peak Load Handling:不会因为突发的流量请求导致系统崩溃,消息队列能够使服务顶住突发的访问压力,有助于解决生产消息和消费消息的处理速度不一致的情况
  4. 异步通信(Asynchronous Communication:消息队列允许用户把消息放入队列但不立即处理它,等待后续进行消费处理。

Kafka 整体架构

架构图:

组件简述:

组件作用
Producer生产者负责将消息发布到Kafka主题中的一个或多个分区。生产者可以选择将消息发送到特定的分区,也可以让Kafka根据配置的分区策略自动选择分区。
BrokerBroker是指运行Kafka服务器实例的单个节点。每个Broker都是一个独立的Kafka服务器,负责接收、存储、转发和处理生产者和消费者之间的消息。多个Broker组成一个Kafka集群,共同协作来提供高可用性、扩展性和容错性。
Consumer订阅一个或多个主题,并从分区中拉取消息进行处理。每个消费者都可以独立地消费一个或多个分区的消息。消费者组(Consumer Groups)允许多个消费者组成一个消费者组,每个消费者负责消费分区的一部分数据。消费者组内的消费者协作工作,确保每个分区的消息被处理,从而实现负载均衡和高可用性。
Topic 主题是消息流的组织单位,每个主题代表一个特定的消息类别。主题可以被分成一个或多个分区(Partition),分区是消息存储的基本单元。分区的存在可以帮助实现数据的水平扩展和并行处理,提高系统的吞吐量和性能。
PartitionTopic可以分为多个Partition,每个Partition在不同的Broker上存储消息,以实现水平扩展和提高吞吐量。主题可以分成一个或多个分区,分区是消息存储的基本单元。分区允许数据水平扩展和并行处理
Offset每个消息在Partition中的唯一标识,Consumer通过Offset来记录自己消费的位置。
Zookeeper / Raft集群元数据协调(Kafka 3.x 可用 KRaft 替代)
Consumer Group多个消费者实例组成的一个组,它们共同消费一组 Topic 的消息。每个 Partition 在同一时间只会被 Consumer Group 中的一个 Consumer 消费,这样可以实现消息的负载均衡,提高消费效率。比如,在一个实时监控系统中,有多个 Consumer 实例组成一个 Consumer Group,共同消费“system_monitoring”Topic 的消息,每个 Consumer 负责处理一部分消息,确保系统能够及时响应和处理大量的监控数据。

核心机制详解(每一节独立成段)

✔️ 3.1 Topic 与 Partition

  • 每个 Topic 可包含多个分区(Partition)

  • 每个 Partition 是一条追加写日志(Append-Only)

  • 分区可并行读写,实现高吞吐

📌 小知识:发送带 key 的消息会根据 key 哈希分配到固定 Partition,保持顺序性

✔️ 3.2 副本机制(Replication)

  • Kafka 每个 Partition 有 一个 Leader多个 Follower 副本

  • 所有写入和读取默认走 Leader

  • 副本同步保证数据冗余,防止数据丢失

📌 参数:

replication.factor=3
min.insync.replicas=2

✔️ 3.3 Producer 与幂等性(Idempotence)

  • Kafka Producer 默认可能因重试导致消息重复

  • enable.idempotence=true 可避免同一条消息重复写入

🔧 示例代码:

props.put("enable.idempotence", "true");

Kafka 利用 PID + Sequence Number + Partition 来识别重复消息。

✔️ 3.4 消费者与 Consumer Group

  • 多个 Consumer 可组成一个 Consumer Group

  • Kafka 自动实现分区再平衡(每个分区只会被 Group 中一个实例消费)

📌 注意:

  • Group 内的消费者消费互斥

  • Group 之间消费互不干扰

✔️ 3.5 offset 管理机制

  • 每个 Consumer 会记录消费的 offset

  • 支持:

    • 自动提交:简单但有重复风险

    • 手动提交:灵活,便于与业务逻辑绑定

  • offset 可保存在 Kafka 的内部 topic 中

✔️ 3.6 消息投递语义(Delivery Semantics)

类型描述
At Most Once最多一次,可能丢
At Least Once至少一次,可能重复
Exactly Once恰好一次,不重复不丢失(高级场景)

✔️ 3.7 Kafka 事务机制(Transactions)

通过事务 API,Kafka 实现:

  • 多条消息的原子写入

  • 写入 + offset 提交的原子性

🔧 示例代码:

producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "msg"));
producer.sendOffsetsToTransaction(offsets, "group-id");
producer.commitTransaction();

✔️ 3.8 Exactly Once 实现

Kafka 的 EOS(Exactly Once Semantics)是指:

在整个“读 → 处理 → 写”流程中,每条消息只被处理一次,不重复,不丢失

实现 EOS 需要满足以下条件:

环节实现手段
Producer 幂等发送enable.idempotence=true
事务性写入transactional.id + begin/commitTransaction()
Consumer 严格读取isolation.level=read_committed
提交 offsetsendOffsetsToTransaction() 原子提交 offset

EOS 处理流程图:

Producer:beginTransaction()↓send(record1)  → Kafka (暂存)send(record2)sendOffsetsToTransaction()↓commitTransaction()  → Kafka 提交事务 or 回滚Consumer:isolation.level=read_committed↓只读取成功提交的事务消息

⚠️ 异常场景处理

commitTransaction() 失败会破坏 EOS 吗?

不会。

Kafka 使用事务状态日志来记录事务状态,即使 commitTransaction 超时或失败,Kafka Broker 最终能恢复事务并决定提交或回滚。

你只需捕获异常并根据情况重试或 abort:

try {producer.commitTransaction();
} catch (TimeoutException e) {// 网络异常导致未知状态// 推荐关闭当前 producer,使用新实例重试逻辑
} catch (ProducerFencedException e) {// 当前事务被“踢出”,无法恢复,只能 abortproducer.abortTransaction();
}

Kafka 的性能优势与使用建议

  • 零拷贝传输、批量发送、文件系统页缓存优化

  • 写入吞吐量远高于传统 MQ 系统

  • 使用建议:

    • Partition 数量根据并发需求和 broker 数量合理设计

    • 批量发送、压缩提高吞吐量

    • Producer 异步发送 + ack=all 配置推荐使用

五、Java 代码示例

除了命令行工具,我们还可以通过编写 Java 代码来与 Kafka 进行交互,实现生产者和消费者的功能。以下是使用 Kafka 的 Java 客户端库编写的简单示例。

Kafka 生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// Kafka服务器地址String bootstrapServers = "localhost:9092";// 主题名称String topic = "test_topic";// 配置生产者属性Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 设置key的序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置value的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者实例KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 10; i++) {String key = "key_" + i;String value = "message_" + i;ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.out.println("发送消息失败: " + exception.getMessage());} else {System.out.println("消息发送成功: " +"主题: " + metadata.topic() +", 分区: " + metadata.partition() +", 偏移量: " + metadata.offset());}}});}// 关闭生产者producer.close();}
}

在上述代码中,首先创建了一个Properties对象,用于配置 Kafka 生产者的属性,包括 Kafka 服务器地址、key 和 value 的序列化器。然后创建了KafkaProducer实例,并通过循环发送 10 条消息到指定的主题。在发送消息时,使用了回调函数Callback,以便在消息发送成功或失败时进行相应的处理。最后,在消息发送完成后,关闭了生产者。

Kafka 消费者
 

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// Kafka服务器地址String bootstrapServers = "localhost:9092";// 消费者组IDString groupId = "test_group";// 主题名称String topic = "test_topic";// 配置消费者属性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 设置消费者组IDprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);// 设置key的反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置value的反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 自动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自动提交偏移量的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList(topic));try {while (true) {// 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.println("收到消息: " +"主题: " + record.topic() +", 分区: " + record.partition() +", 偏移量: " + record.offset() +", key: " + record.key() +", value: " + record.value());}}} finally {// 关闭消费者consumer.close();}}
}

这段代码展示了如何使用 Java 编写一个简单的 Kafka 消费者。首先配置了消费者的属性,包括 Kafka 服务器地址、消费者组 ID、key 和 value 的反序列化器,以及自动提交偏移量的相关配置。然后创建了KafkaConsumer实例,并使用subscribe方法订阅了指定的主题。在一个无限循环中,通过poll方法不断从 Kafka 服务器拉取消息,并打印出每条消息的相关信息。最后,在程序结束时关闭了消费者。
 

结语

Kafka 并不仅仅是一个消息队列,更是一个可靠的实时数据流平台。通过合理运用其幂等性、事务、Exactly Once 等机制,可以构建出稳定、可扩展、强一致性的分布式系统。

📎 附录:配置参数参考

# Producer 端配置
enable.idempotence=true
acks=all
retries=Integer.MAX_VALUE
transactional.id=producer-txn-1# Consumer 端配置
isolation.level=read_committed

其它

✅ 主流消息系统对比表:Kafka、RabbitMQ、Redis Streams、ActiveMQ

特性 / 系统KafkaRabbitMQRedis StreamsActiveMQ
📦 消息模型(Messaging Model)发布订阅、日志型流队列、发布订阅、路由日志流(Stream)队列、主题
🧱 持久化能力(Persistence Capability)强,基于磁盘支持(需配置)支持(RDB / AOF)支持
⚡ 吞吐量(Throughput)极高(百万级 TPS)中等(万级 TPS)中等(万级 TPS)
⏱️ 延迟(Latency)低~中(ms 级)低(毫秒级)极低(内存操作)
📊 消息堆积能力(Message Backlog Handling)强(磁盘存储,无堆积上限)较弱(默认内存受限)中(可限制 MAXLEN)一般
✅ 消费模式(Consumption Model)Consumer Group、offset 手动管理自动 ack / 手动 ack / nackConsumer Group + XACK支持
🔁 消息重放(Message Replay)强(按 offset 任意位置重读)一般(需持久化、死信队列)支持(通过 ID)一般
💥 消息顺序(Message Ordering)分区内顺序需要保证(FIFO 队列)有序(按 ID)顺序有限保障
🔒 Exactly Once 支持(Exactly-Once Delivery Support)是(配合事务 API)否(最多一次或至少一次)否(手动 ack 可近似)
🔄 消息路由能力(Routing Capability)依赖 topic 和分区机制强(Exchange 路由策略)弱(stream 不具备 routing)强(topic/routing)
🌐 部署复杂度(Deployment Complexity)高(ZooKeeper/KRaft)中(单节点或集群)低(单 Redis 实例)
🌎 云服务支持(Cloud Support)Kafka on Confluent/AWS/MS AzureRabbitMQ on CloudAMQP/AWSAWS ElastiCache 支持 RedisAWS、Azure 支持
📚 生态支持(Ecosystem & Tooling Support)极强(Kafka Streams、Flink)广泛(Spring AMQP 等)较少(正在增强)成熟但活跃度下降
🚀 典型用途(Typical Use Cases)日志流处理、行为埋点、数据管道微服务通信、业务异步通知简单消息队列、实时监控、内网异步传统系统整合、企业集成

🧠 总结建议(按使用场景):

应用场景推荐消息系统理由说明
🔁 微服务之间异步通信RabbitMQ / Redis简洁、轻量、低延迟
📊 实时日志收集、用户行为分析Kafka / Pulsar高吞吐、大数据能力
📈 数据 ETL 流、CDC 同步Kafka / RocketMQ可靠、支持重放
📦 电商下单、支付等交易系统RocketMQ / Kafka支持事务、顺序投递
📡 IoT、设备状态流Pulsar / Kafka分布式、高扩展性
📥 简单消息处理(中小型任务队列)Redis Stream / SQS快速部署、无需复杂维护
☁️ 云原生、无需部署AWS SQS / Kafka Cloud低运维,服务托管

📌 总结一句话:

  • 💨 想要低延迟、简单部署 → Redis Streams / RabbitMQ

  • 📈 想要高吞吐、可重放 → Kafka / Pulsar

  • 🏦 想要事务与可靠性 → RocketMQ

  • 🌩 想要上云、省事 → SQS / Kafka Cloud

http://www.lryc.cn/news/573584.html

相关文章:

  • Vite 原理深入剖析
  • 【PyTorch革命】机器学习系统编程模型的演进之路
  • 从C++编程入手设计模式——命令模式
  • 【机器学习四大核心任务类型详解】分类、回归、聚类、降维智能决策指南
  • 8.特征提取与直方图
  • MATLAB GUI界面设计 第二章——APP Designer操作正式入门
  • Linux 下的 socket
  • Node.js爬虫 CheerioJS ‌轻量级解析、操作和渲染HTML及XML文档
  • 【机器学习的五大核心步骤】从零构建一个智能系统
  • STM32-GPIO-推挽输出详解
  • 深入解析Flink Local模式启动流程源码:揭开作业初始化的神秘面纱
  • Ubuntu20 搭建 Java、Redis、Nginx
  • GO 语言学习 之 helloWorld
  • 2025年SVN学习价值分析
  • react day.js使用及经典场景
  • 【RocketMQ 生产者和消费者】- 消费者重平衡(3)- 消费者 ID 对负载均衡的影响
  • 微前端MFE: 通过共享模块通信(模块联邦Module Federation)
  • 【机器学习四大核心任务类型详解】分类、回归、聚类、降维都是什么?
  • 【论文阅读笔记】TransparentGS:当高斯溅射学会“看穿”玻璃,如何攻克透明物体重建难题?
  • 【Nature Communications】超高介电常数材料 Hf0.5Zr0.5O2(HZO)
  • Oracle 11G RAC修改public ip vip private ip
  • 【数据治理】要点整理-《数据管理能力成熟度评估模型》国家标准(GB/T 36073—2018)
  • Linux的文件权限
  • 16_设备树中的remote-endpoint演示基于视频字符设备Linux内核模块
  • python源码:执行pdf合并/分页/图片管理功能
  • 计算机网络课程设计--基于TCP协议的文件传输系统
  • 案例练习二
  • rom定制系列------红米note11 5G版 MTK芯片强解bl锁修复bug 官方系统 面具root批量线刷版
  • 魂斗罗ost 游戏全合集8GB
  • 微服务网关/nacos/feign总结