当前位置: 首页 > news >正文

Flink的基于两阶段提交协议的事务数据汇实现

背景

在flink中可以通过使用事务性数据汇实现精准一次的保证,本文基于Kakfa的事务处理来看一下在Flink 内部如何实现基于两阶段提交协议的事务性数据汇.

flink kafka事务性数据汇的实现

1。首先在开始进行快照的时候也就是收到checkpoint通知的时候,在snapshot方法中会开启一个新的事务,代码如下:

   public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);preCommit(currentTransactionHolder.handle);// 调用kafkaProducer.flush();清理上一个事务的状态(注意不是提交),只是确保前一个事务的所有资源清理完毕pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
// 调用producer.beginTransaction();方法开启一个新的kafka事务currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}

2.其次在JobManager通知检查点完成的通知方法,也就是notifyCheckpointComplete方法中提交事务

Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {//调用producer.commitTransaction()方法提交事务commit(pendingTransaction.handle);} catch (Throwable t) {if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);pendingTransactionIterator.remove();}if (firstError != null) {throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}

至此,一个两阶段提交的flink事务性数据汇完成了,这个事务性数据汇可以构成端到端一致性的一部分

http://www.lryc.cn/news/195367.html

相关文章:

  • 树模型(三)决策树
  • vueday01——使用属性绑定+ref属性定位获取id
  • LeetCode 260. 只出现一次的数字 III:异或
  • 使用PyTorch解决多分类问题:构建、训练和评估深度学习模型
  • 基于nodejs+vue网课学习平台
  • 读书笔记:Effective C++ 2.0 版,条款13(初始化顺序==声明顺序)、条款14(基类有虚析构)
  • flutter开发实战-下拉刷新与上拉加载更多实现
  • 旧手机热点机改造成服务器方案
  • 网工实验笔记:策略路由PBR的应用场景
  • webrtc快速入门——使用 WebRTC 拍摄静止的照片
  • 预约按摩app软件开发定制足浴SPA上们服务小程序
  • jenkins出错与恢复
  • ssh免密登录的原理RSA非对称加密的理解
  • 【监督学习】基于合取子句进化算法(CCEA)和析取范式进化算法(DNFEA)解决分类问题(Matlab代码实现)
  • 力扣每日一题41:缺失的第一个正数
  • OpenCV与mediapipe实践
  • 【css拾遗】粘性布局实现有滚动条的情况下,按钮固定在页面底部展示
  • git 创建并配置 GitHub 连接密钥
  • 使用Premiere、PhotoShop和Audition做视频特效
  • vueday01——动态参数
  • 双向链表C语言版本
  • visual studio安装时候修改共享组件、工具和SDK路径方法
  • Motorola IPMC761 使用边缘TPU加速神经网络
  • EM@直线的参数方程
  • day08-注册功能、前端登录注册页面复制、前端登录功能、前端注册功能
  • rust: function
  • 零代码编程:用ChatGPT批量下载谷歌podcast上的播客音频
  • nginx.4——正向代理和反向代理(七层代理和四层代理)
  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程(三)
  • Spring-事务源码解析2