当前位置: 首页 > news >正文

【flink状态管理(四)】MemoryStateBackend的实现

文章目录

  • 1.基于MemoryStateBackend创建KeyedStateBackend
    • 1.1. 状态初始化
    • 1.2. 创建状态
  • 2. 基于MemoryStateBackend创建OperatorStateBackend
  • 3.基于MemoryStateBackend创建CheckpointStorage

在Flink中,默认的StateBackend实现为MemoryStateBackend,本文以MemoryStateBackend为例说明StateBackend的设计与实现。

 
本文介绍MemoryStateBackend中如下三个主要组件的创建过程:

  • HeapKeyedStateBackend
  • OperatorStateBackend
  • MemoryBackendCheckpointStorage

FsStateBackend和RocksDBStateBackend这两种状态后端存储的实现,功能和MemoryStateBackend类似,区别在于内部创建的KeyedStateBackend和CheckpointStorage。

 

1.基于MemoryStateBackend创建KeyedStateBackend

1.1. 状态初始化

AbstractStreamOperator.keyedStatedBackend()方法定义了创建和初始化KeyedStatedBackend的逻辑,具体如下。

protected <K> AbstractKeyedStateBackend<K> keyedStateBackend(TypeSerializer<K> keySerializer,String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry,MetricGroup metricGroup) throws Exception {if (keySerializer == null) {return null;}String logDescription = "keyed state backend for " + operatorIdentifierText;//1. TaskInfo taskInfo = environment.getTaskInfo();final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(taskInfo.getMaxNumberOfParallelSubtasks(),taskInfo.getNumberOfParallelSubtasks(),taskInfo.getIndexOfThisSubtask());// 确保恢复状态过程中构建的数据流被关闭CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);// 创建BackendRestorerProcedureBackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createKeyedStateBackend(environment,environment.getJobID(),operatorIdentifierText,keySerializer,taskInfo.getMaxNumberOfParallelSubtasks(),keyGroupRange,environment.getTaskKvStateRegistry(),TtlTimeProvider.DEFAULT,metricGroup,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 获取当前Task的TaskInfo,并基于TaskInfo的参数创建KeyGroupRange,表示当前Task实例中存储的Key分组区间
  2. 创建CloseableRegistry并注册到backendCloseableRegistry中,用于确保在任务取消的情况下关闭在恢复状态过程中构造的数据流。
  3. 创建BackendRestorerProcedure,提供了stateBackend.createKeyedStateBackend()方法,也包含恢复历史状态数据的方法。
  4. 创建KeyedStateBackend,同时对状态数据进行恢复。prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子历史状态,通过prioritizedOperatorSubtaskStates获取当前算子的PrioritizedManagedKeyedState,并基于这些状态数据恢复算子的状态。

 

1.2. 创建状态

接下来我们看MemoryStateBackend.createKeyedStateBackend()方法的具体实现。

public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {// 获取TaskStateManager实例TaskStateManager taskStateManager = env.getTaskStateManager();// 创建HeapPriorityQueueSetFactory实例HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);// 创建HeapKeyedStateBackendBuilder实例HeapKeyedStateBackendreturn new HeapKeyedStateBackendBuilder<>(kvStateRegistry,keySerializer,env.getUserClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),taskStateManager.createLocalRecoveryConfig(),priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();
}
  1. 从environment参数中获取TaskStateManager实例
  2. 创建HeapPriorityQueueSetFactory实例,用于生成HeapPriorityQueueSet优先级队列,存储TimerHeapInternalTimer等数据。
  3. 调用HeapKeyedStateBackendBuilder.build()方法创建HeapKeyedStateBackend。

 

2. 基于MemoryStateBackend创建OperatorStateBackend

和创建KeyedStateBackend的过程相似,AbstractStreamOperator.operatorStateBackend()方法实现了创建OperatorStateBackend的方法。

protected OperatorStateBackend operatorStateBackend(String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry) throws Exception {String logDescription = "operator state backend for " + operatorIdentifierText;CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createOperatorStateBackend(environment,operatorIdentifierText,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 创建CloseableRegisty,确保在任务取消的情况下能够关闭在恢复状态时构造的数据流。
  2. 创建BackendRestorerProcedure,封装了stateBackend.createOperatorStateBackend()方法,并包含恢复历史状态数据的操作。
  3. 创建OperatorStateBackend,并恢复状态数据。

其中prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子专有历史状态,可以通过prioritizedOperatorSubtaskStates获取当前算子中的PrioritizedManagedOperatorState,并基于这些状态数据恢复OperatorStateBackend中算子的状态。

 

3.基于MemoryStateBackend创建CheckpointStorage

在createCheckpointStorage()方法中,直接创建MemoryBackendCheckpointStorage实例并返回,没有涉及太多的流程

public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
}

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

http://www.lryc.cn/news/296705.html

相关文章:

  • 前端架构: 脚手架在前端研发流程中的意义
  • Qt网络编程-QTcpServer的封装
  • 【MySQL】_JDBC编程
  • 微信小程序编译出现 project.config.json 文件内容错误
  • 一周学会Django5 Python Web开发-Django5创建项目(用命令方式)
  • DockerUI如何部署结合内网穿透实现公网环境管理本地docker容器
  • UML之在Markdown中使用Mermaid绘制类图
  • Spring Boot + 七牛OSS: 简化云存储集成
  • C++:二叉搜索树模拟实现(KV模型)
  • npm淘宝镜像源换新地址
  • 十大排序算法之线性时间非比较类排序
  • 容器基础:Docker 镜像如何保证部署的一致性?
  • 爪哇部落算法组2024新生赛热身赛题解
  • 1123. 铲雪车(欧拉回路)
  • 网络协议与攻击模拟_15FTP协议
  • 「效果图渲染」效果图与3D影视动画渲染平台
  • Blender_查看版本
  • node.js 读目录.txt文件,用 xml2js 转换为json数据,生成jstree所需的文件
  • 【Docker】02 镜像管理
  • 了解海外云手机的多种功能
  • 白酒:自动化生产线的优势与实践
  • 用HTML5实现灯笼效果
  • Postgresql源码(120)事务XID分配与主备XID同步
  • B2077 角谷猜想(洛谷)
  • 排序算法---归并排序
  • [WUSTCTF2020]朴实无华(特详解)
  • 下载已编译的 OpenCV 包在 Visual Studio 下实现快速配置
  • 【Linux系统学习】3.Linux用户和权限
  • 视频美颜SDK开发指南:从入门到精通的技术实践
  • Electron基本介绍