当前位置: 首页 > news >正文

从Flink的Kafka消费者看算子联合列表状态的使用

背景

算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态

算子联合列表状态

首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
在这里插入图片描述
算子联合列表状态主要由这两个方法处理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}

2.开始通知检查点开始的方法:

public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}
http://www.lryc.cn/news/197378.html

相关文章:

  • CSS3 按钮
  • STM32 BootLoader设置
  • django REST framework-使用与不使用的区别?
  • 获取URL中的参数
  • 一起学数据结构(9)——二叉树的链式存储及相关功能实现
  • vue 后端返回二进制流-前端通过blob对象下载文件-图片
  • vue el-dialog封装成子组件(组件化)
  • 爬虫教程 一 requests包的使用
  • Aria2NG连接aria2-pro提示认证失败的处理办法
  • MYSQL 连接
  • SeaTunnel 换maven源,解决插件下载慢
  • 安卓14通过“冻结”缓存应用程序腾出CPU,提高性能和内存效率
  • jupyter崩溃OOM,out of memory,jupyter代码写不进去,保存不了。
  • 一文带你快速掌握爬虫开发中的一些高级调试技巧
  • 6.(vue3.x+vite)路由传参query与params区别
  • C++string的使用
  • 闲着也是闲着,自己写歌东西玩一玩,碰碰脑子,简单快乐一点,双人出数的小游戏,后续还带补充
  • 牛客网 -- WY28 跳石板
  • [正式学习java③]——字符串在内存中的存储方式、为什么字符串不可变、字符串的拼接原理,键盘录入的小细节。
  • 行情分析——加密货币市场大盘走势(10.18)
  • 高并发场景下常见的限流算法及方案介绍
  • 虹科分享 | 选择SAS还是NVMe?虹科网络基础带您一探究竟!
  • 在ERP管理系统中,库存管理的基本流程是什么?
  • Ruby 之 csv 文件读写
  • Android AMS——进程LRU列表更新(十七)
  • 【数据可视化】—大屏数据可视化展示
  • 计算机算法分析与设计(12)---贪心算法(最优装载问题和哈夫曼编码问题)
  • 打造属于自己的vue图标库
  • C++11线程池
  • 企业打造VR虚拟展厅,开启商务洽谈新时代!