当前位置: 首页 > news >正文

怎么理解使用MQ解决分布式事务 -- 以kafka为例

利用 Apache Kafka 实现分布式事务的完整指南

本文聚焦 Kafka 原生能力,从「事务语义 → 代码 → 运维 → 故障场景」逐层展开,给出可在生产环境直接落地的全套方案。


一、Kafka 分布式事务的 3 个核心语义

语义实现机制配置/代码标志
幂等性Broker 端去重 + Sequence Numberenable.idempotence=true
事务两阶段提交 + Transaction Coordinatortransactional.id
读已提交消费者过滤未提交事务消息isolation.level=read_committed

二、架构全景图

┌─────────────────────────────────────────────────────────────┐
│  Producer (订单服务)                                         │
│  1. beginTransaction()                                       │
│  2. insert into order_tbl …                                  │
│  3. send("stock-deduct", orderId)                            │
│  4. commitTransaction()   ─┐                                 │
└────────────────────────────┼─────────────────────────────┐   ││ 两阶段提交                   │   │
┌────────────────────────────┼─────────────────────────────┘   │
│  Broker                                                    │   │
│  • Transaction Coordinator (TC)                            │   │
│  • __transaction_state 日志 (3 副本)                       │   │
│  • 写入分区队列                                           │   │
└────────────────────────────┼─────────────────────────────┐   ││ 仅投递 committed 消息        │   │
┌────────────────────────────┼─────────────────────────────┘   │
│  Consumer (库存服务)                                       │
│  5. poll() → read_committed                               │
│  6. update stock_tbl set qty = qty - ? where id = ?        │
│  7. ack()                                                  │
└─────────────────────────────────────────────────────────────┘

三、Producer 端完整配置与代码

1. 通用 Producer 参数

bootstrap.servers=kafka:9092
enable.idempotence=true               # 幂等发送
transactional.id=order-service-tx-1   # 全局唯一
acks=all
max.in.flight.requests.per.connection=5
transaction.timeout.ms=30000          # 小于 broker 的 max.transaction.timeout.ms

2. Spring Boot 双事务(Kafka + JDBC)

@Configuration
public class KafkaChainedTxConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-tx");DefaultKafkaProducerFactory<String, String> pf =new DefaultKafkaProducerFactory<>(props);pf.setTransactionIdPrefix("order-tx-");          // 支持并发事务return pf;}@Beanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> pf) {return new KafkaTransactionManager<>(pf);}@Bean("chainedTxManager")public ChainedTransactionManager chainedTxManager(KafkaTransactionManager<?, ?> ktm,DataSourceTransactionManager dstm) {return new ChainedTransactionManager(ktm, dstm);}
}

3. Service 层

@Service
public class OrderService {private final OrderRepository repo;private final KafkaTemplate<String, OrderEvent> kafka;@Transactional("chainedTxManager")public void createOrder(CreateOrderCommand cmd) {// 1. 本地事务Order order = repo.save(new Order(cmd));// 2. 发送事务消息OrderEvent event = new OrderEvent(order.getId(), cmd.getSkuId(), cmd.getQty());kafka.send("stock-deduct", order.getId().toString(), event);// 3. 若 DB 回滚,Kafka 事务也回滚;反之亦然}
}

四、Consumer 端:幂等 + 重试 + 死信队列

1. 消费者配置

bootstrap.servers=kafka:9092
group.id=stock-service
isolation.level=read_committed
enable.auto.commit=false
max.poll.records=100

2. 监听器(批量 + 幂等)

@Component
public class StockConsumer {private final StockRepository stockRepo;@KafkaListener(topics = "stock-deduct",containerFactory = "batchFactory")public void listen(List<ConsumerRecord<String, OrderEvent>> records,Acknowledgment ack) {for (var r : records) {try {consumeOne(r.value());} catch (DuplicateKeyException ex) {// 幂等冲突,跳过} catch (DataIntegrityViolationException ex) {// 库存不足,记录告警并手动 ack,不再重试} catch (Exception ex) {// 其他异常:抛出让 SeekToCurrentErrorHandler 重试throw ex;}}ack.acknowledge();}@Transactionalpublic void consumeOne(OrderEvent e) {int affected = stockRepo.deductQty(e.getSkuId(), e.getQty(), e.getOrderId());if (affected == 0) {throw new IllegalStateException("库存扣减失败");}}
}

3. 重试与死信队列(Spring Kafka)

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> batchFactory(ConsumerFactory<String, OrderEvent> cf) {ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(cf);factory.setBatchListener(true);// 最多重试 3 次后发送到 DLQDefaultErrorHandler handler =new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate(), (r, e) -> new TopicPartition("stock-deduct.DLT", r.partition())),new FixedBackOff(1000L, 2));factory.setCommonErrorHandler(handler);return factory;
}

五、事务超时 & 死锁排查

指标触发场景解决
transaction.timeout.ms 超期Broker 未收到 commit/abort调大或优化业务耗时
producer.send 阻塞网络抖动、ISR < min.insync.replicas监控 kafka.server:RequestQueueTimeMs
消费者 lag 持续增大下游消费慢 / 重试风暴扩容消费者、减少 batch size

六、完整监控体系

  1. JMX 指标

    • Producer:record-send-rate, transaction-duration-avg
    • Broker:transaction-coordinator-metricstransactional-id-count
    • Consumer:records-lag-max, commit-latency-avg
  2. Prometheus + Grafana

    - pattern: kafka.producer<type=producer-metrics, client-id=(.+)><>(transaction-duration-avg)name: kafka_producer_transaction_duration_avglabels:client_id: "$1"
    
  3. 告警规则示例

    - alert: KafkaTransactionStuckexpr: kafka_producer_transaction_duration_avg > 20for: 1mannotations:summary: "事务长时间未完成"
    

七、故障演练清单

场景操作预期行为
Broker 重启docker kill kafka-1事务协调器 failover,事务仍可完成
Producer 进程崩溃kill -9事务超时后 Broker 自动 abort
消费者消费异常业务抛异常重试 3 次 → DLQ → 人工处理

八、小结

维度结论
一致性本地事务 + Kafka 事务 API → 原子提交
可用性异步投递,高吞吐,支持水平扩容
复杂度仅需幂等消费与重试策略,2PC 网络阻塞消失
性能实测 TPS 下降 < 10%,远低于数据库 2PC

至此,从配置、代码到监控、故障演练 的 Kafka 分布式事务闭环已完整落地。

http://www.lryc.cn/news/603553.html

相关文章:

  • 【EDA】Calma--早期版图绘制工具商
  • Kafka运维实战 16 - kafka 分区重新分配【实战】
  • Javaweb————揭秘404 not found(HTTP常用响应码)
  • 【数据结构】真题 2016
  • STM32--DHT11(标准库)驱动开发
  • JVM 崩溃(Fatal Error)解决方法
  • 26考研11408数据结构
  • 【Docker】 Docker镜像瘦身终极指南:多阶段构建+Alpine优化+分层策略深度解析
  • 飞机大战小游戏
  • 第十六章 Java基础-拼图小游戏
  • 【Unity编辑器扩展】Unity 笔记编辑器开发详解(支持多页面、重命名、持久化保存)
  • 项目历程—生命数组游戏(两版本)
  • Unity 编辑器开发 之 Excel导表工具
  • 游戏盾从哪些方面保护网站业务?
  • c语言-数据结构-二叉树OJ之子树与二叉树的构建
  • QT项目 -仿QQ音乐的音乐播放器(第三节)
  • 电脑没有声音了怎么恢复 快速解决音频故障
  • 预装Windows 11系统的新电脑怎么跳过联网验机
  • Wndows Docker Desktop-Unexpected WSL error
  • Docker初学者需要了解的几个知识点(三)
  • docker 重新安裝
  • 小杰数据结构(one day)——心若安,便是晴天;心若乱,便是阴天。
  • 数据结构 排序(2)---选择排序
  • RK3568下的进程间广播通信:用C语言构建简单的中心服务器
  • 【WRF工具】服务器中安装编译GrADS
  • 信创国产Linux操作系统汇总:从桌面到服务器,百花齐放
  • 聚铭安全管家平台2.0实战解码 | 安服篇(三):配置保障 自动核查
  • mapbox进阶,mapbox-gl-draw绘图插件扩展,编辑模式支持点、线、面的捕捉
  • Android系统开发 在Android10版本的Framework中添加系统服务
  • Kafka——Kafka控制器