当前位置: 首页 > news >正文

Kafka生产者事务机制原理

【博客】
Kafka生产者事务机制原理


一、为什么要引入事务?

在使用 Kafka 的早期版本时,开发者经常会遇到两种场景:

  1. 跨会话重复消息
    Producer 重启后,之前的重试逻辑会导致同一条消息被再次发送,消费者需要做幂等处理。

  2. 跨分区原子性缺失
    一批消息要同时写入多个 Topic / 多个 Partition,如果某一条失败,前面成功的消息无法回滚,业务数据出现“中间状态”。

Kafka 在 0.11.0.0 引入的事务(Transactions)正是为了解决“恰好一次(Exactly-Once)语义”的痛点,同时兼顾跨会话幂等性、跨分区原子性和 consume-process-produce 模式的一致性。


二、事务的四大核心目标

目标说明
原子性一组消息要么全部成功,要么全部失败。
跨会话幂等Producer 重启后,仍能识别并去重“上一次未完成的事务”。
一致性consume-process-produce 模式下,消费位点与下游发送结果保持一致。
隔离性事务未提交的消息对消费者不可见,防止脏读。

三、事务 API 速查表

Kafka Producer 端只提供了 5 个与事务相关的方法,掌握它们就能完成 90% 的编程需求:

方法作用
initTransactions()向 Coordinator 注册全局唯一 transactional.id,做初始化。
beginTransaction()显式开启一个事务。
sendOffsetsToTransaction()把消费者 offset 作为事务的一部分提交,用于 consume-process-produce 模式。
commitTransaction()全部成功,两阶段提交中的“真正提交”。
abortTransaction()出现异常,回滚当前事务。

Spring Boot 用户可以用 @TransactionalkafkaTemplate.executeInTransaction() 进行声明式/编程式事务,原理一致。


四、事务运行流程(两阶段提交 2PC)

Kafka 没有照搬传统 XA 的复杂协议,而是基于内部 Topic 实现了一个轻量级 2PC。

1. 组件角色

角色说明
Producer业务进程,负责发送消息。
Transaction Coordinator一个 Broker 内的模块,充当 2PC 的协调者。
__transaction_state内部 Topic,持久化事务状态(Ongoing → Prepare → Commit/Abort)。
目标 Topic-Partition最终存放业务数据。

2. Kafka事务机制原理

源码位置:org.apache.kafka.clients.producer.internals.TransactionManager,画出Kafka 事务 2PC 全景

┌────────────────────────────────────────────────────────────┐
│                     Kafka 事务 2PC 全景                    │
├───────────────┐   ┌──────────────────┐   ┌──────────────────┐
│   Producer    │   │Transaction       │   │   Brokers        │
│(transaction.id│──▶│Coordinator(TC)   │◀──┤(Data partitions)│
└───────────────┘   └──────────────────┘   └──────────────────┘│                      │                     ││ 1. initTransactions  │                     ││--------------------->2. 写 __transaction_state│                      │   topic 记录 BEGIN│ 3. send()            │                     ││-----------------------------4. 写消息到目标分区│                      │                     ││ 5. commitTransaction │                     ││--------------------->6. 写 PREPARE_COMMIT│                      │                     ││                      │ 7. 给各分区写 COMMIT 标记│                      │◀────────────────────┘│                      │ 8. 写 __transaction_state│                      │   记录 COMMITTED
  1. 初始化阶段
    initTransactions() → 找到 Coordinator → 注册 transactional.id,幂等 Producer 自动开启(enable.idempotence=true)。

  2. 开始事务
    beginTransaction() 仅在客户端打一个标记,不会立即与 Broker 交互。

  3. 发送消息
    调用 producer.send(),消息并未直接写入目标分区,而是暂存客户端的 RecordAccumulator,并标记为事务消息。

  4. 预提交(Prepare)
    客户端 flush 或 commitTransaction() 时,Coordinator 收到 EndTxn(Prepare),把事务状态写入 __transaction_state,并向所有涉及的 Topic-Partition 写入 事务控制消息(Control Batch)。

  5. 正式提交(Commit)
    Coordinator 收到所有 Partition 的 ACK 后,写入 __transaction_state 的 Commit 标记,并向各 Partition Leader 发送 COMMIT Marker
    消费者只有在看到 COMMIT Marker 后,才能看到这批消息。至此事务对外可见。

  6. 异常回滚(Abort)
    任何一步失败,Producer 捕获异常后调用 abortTransaction(),流程同上,只是把标记改成 ABORT,消息对消费者永久不可见。

__transaction_state 状态有ongoingprepare committed,和对应操作的具体图示:

Producer            Transaction Coordinator            日志 & 分区|                       |                               ||--- init(t.id) ------>|--- 记录事务ID ---------------->||                       |                               ||--- begin() --------->|--- 状态=ongoing -------------->||                       |                               ||--- send 消息 -------->|--- 写入未提交数据 ------------>||                       |                               ||--- commit() -------->|--- 状态=prepare -------------->||                       |--- 写 commit marker ---------->||                       |--- 状态=committed ------------>|

生产者、Transactions Coordinator的相互作用图示:
在这里插入图片描述
A:生产者通过initTransactions API向Coordinator 注册事务ID
B:Transactions Coordinator 记录事务日志
C:生产者把消息写入分区
D:分区和Coordinator的交互。(当事务完成以后,消息的状态应该是已提交,消费者才可以消费)


五、代码实现

原生 API

// 1. 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("transactional.id", "order-tx-" + UUID.randomUUID());
props.put("enable.idempotence", "true");        // 自动开启幂等
props.put("isolation.level", "read_committed"); // 消费者只读已提交KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);// 2. 初始化
producer.initTransactions();while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (records.isEmpty()) continue;producer.beginTransaction();try {for (ConsumerRecord<String, String> r : records) {String newVal = transform(r.value());   // 业务逻辑producer.send(new ProducerRecord<>("target-topic", r.key(), newVal));}// 把消费位点也放进事务producer.sendOffsetsToTransaction(offsets(records), new ConsumerGroupMetadata("myGroup"));producer.commitTransaction();} catch (Exception e) {producer.abortTransaction();}
}

Spring Boot(声明式)

@Component
public class OrderListener {@Autowiredprivate KafkaTemplate<String, String> template;@KafkaListener(topics = "order-in")public void listen(ConsumerRecord<String, String> record,Acknowledgment ack) {template.executeInTransaction(t -> {try {// 1. 业务String newVal = processOrder(record.value());// 2. 写下游t.send("order-out", record.key(), newVal);// 3. 提交 offsett.sendOffsetsToTransaction(Map.of(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)),new ConsumerGroupMetadata("order-group"));return null;} catch (Exception e) {throw new KafkaException("事务失败", e); // 触发回滚}});}private String processOrder(String json) {// 业务逻辑return json.toUpperCase();}
}

六、小结

Kafka 事务 = 幂等 Producer + 两阶段提交 + 内部 Topic 日志。
掌握 init → begin → commit/abort 三步曲,即可获得消息层面的 ACID。
🚀 下一步:把本地数据库事务与 Kafka 事务组合,实现真正的 端到端 Exactly-Once。用思维导图总结本博客内容:

在这里插入图片描述

http://www.lryc.cn/news/613717.html

相关文章:

  • 前端单元测试最佳实践(一)
  • 前端开发(HTML,CSS,VUE,JS)从入门到精通!第八天(Vue框架及其安装)(完结篇) 重点 ! ! !
  • 基于Web的交互式坐标系变换矩阵计算工具
  • 【Linux】Linux增删改查命令大全(附频率评级)
  • vue3 map和filter功能 用法
  • Odoo 18 → Odoo 19 功能改动对比表
  • Vue3 基本语法
  • day21|学习前端vue3框架和ts语言
  • pdf文件转word免费使用几个工具
  • CSS BFC
  • webrtc弱网-EncodeUsageResource类源码分析及算法原理
  • Spring Security自动处理/login请求,后端控制层没有 @PostMapping(“/login“) 这样的 Controller 方法
  • 设计模式(二)——策略模式
  • 冠雅新品 | 以“无形之光”守护双眸,以“无声之智”浸润生活
  • 基于R语言,“上百种机器学习模型”学习教程 | Mime包
  • 【昇腾】Atlas 500 A2 智能小站制卡从M.2 SATA盘启动Ubuntu22.04系统,重新上电卡死没进系统问题处理_20250808
  • 主播生活模拟器2|主播人生模拟器2 (Streamer Life Simulator 2)免安装中文版
  • 31-数据仓库与Apache Hive-Insert插入数据
  • Pinterest视觉营销自动化:亚矩阵云手机实例与多分辨率适配技术
  • 远期(Forward)交易系统全球金融市场解决方案报告
  • 32-Hive SQL DML语法之查询数据
  • 《Hive、HBase、StarRocks、MySQL、OceanBase 全面对比:架构、优缺点与使用场景详解》
  • 安装部署K8S集群环境(实测有效版本)
  • K8s 常见故障案例分析
  • ArgoCD 与 GitOps:K8S 原生持续部署的实操指南
  • hive-日期拆分为多行
  • 二、k8s 1.29 之 网络
  • 2025年城市建设与智慧交通国际会议(ICUCIT 2025)
  • Vue复习
  • 暴力解决MySQL连接失败