当前位置: 首页 > news >正文

Flink State 状态后端分析

flink状态实现分析

state

 *             State*               |*               +-------------------InternalKvState*               |                         |*          MergingState                   |*               |                         |*               +-----------------InternalMergingState*               |                         |*      +--------+------+                  |*      |               |                  |* ReducingState    ListState        +-----+-----------------+*      |               |            |                       |*      +-----------+   +-----------   -----------------InternalListState*                  |                |*                  +---------InternalReducingState

MemoryState

AbstractHeapState
HeapMapState
InternalMapState
InternalKvState
State
AbstractHeapMergingState
HeapListState
InternalListState
AbstractHeapAppendingState
InternalMergingState
InternalAppendingState
HeapValueState
InternalValueState

RocksDBState

State
InternalKvState
AbstractRocksDBState
RocksDBMapState
RocksDBListState
RocksDBValueState
RocksDBReducingState
RocksDBAggregatingState
class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>> {private TypeSerializer<UK> userKeySerializer;private TypeSerializer<UV> userValueSerializer;private RocksDBMapState(ColumnFamilyHandle columnFamily,TypeSerializer<N> namespaceSerializer,TypeSerializer<Map<UK, UV>> valueSerializer,Map<UK, UV> defaultValue,RocksDBKeyedStateBackend<K> backend);public TypeSerializer<K> getKeySerializer();public TypeSerializer<N> getNamespaceSerializer();public TypeSerializer<Map<UK, UV>> getValueSerializer();public UV get(UK userKey){ //直接读rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);return (rawValueBytes == null? null: deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));}public void put(UK userKey, UV userValue){ //直接写rocksdbbyte[] rawKeyBytes =serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); //backend.db是RocksDBKeyedStateBackend}public void putAll(Map<UK, UV> map);public void remove(UK userKey);public boolean contains(UK userKey);public Iterable<Map.Entry<UK, UV>> entries();public Iterable<UK> keys();public Iterable<UV> values();public boolean isEmpty();public void clear();static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc,Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>registerResult,RocksDBKeyedStateBackend<K> backend) { //backend在这里传入return (IS)new RocksDBMapState<>(registerResult.f0,registerResult.f1.getNamespaceSerializer(),(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),(Map<UK, UV>) stateDesc.getDefaultValue(),backend);}
}

backend与checkpoint

AbstractKeyedStateBackend
RocksDBKeyedStateBackend
CheckpointableKeyedStateBackend
KeyedStateBackend
Snapshotable
HeapKeyedStateBackend
OperatorStateBackend
DefaultOperatorStateBackend
OperatorStateStore
public interface Snapshotable<S extends StateObject> {RunnableFuture<S> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory streamFactory,@Nonnull CheckpointOptions checkpointOptions)throws Exception;
}

FSBackend

  • FsStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackend
  • FsStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
  • DefaultOperatorStateBackend创建了PartitionableListState, 是State的子类
AbstractFileStateBackend
FsStateBackend
AbstractStateBackend
CheckpointStorage
StateBackend
ConfigurableStateBackend
public interface StateBackend extends java.io.Serializable {default String getName() {return this.getClass().getSimpleName();}<K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws Exception;/** Whether the state backend uses Flink's managed memory. */default boolean useManagedMemory() {return false;}}
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {checkNotNull(jobId, "jobId");return new FsCheckpointStorageAccess(getCheckpointPath(),getSavepointPath(),jobId,getMinFileSizeThreshold(),getWriteBufferSize());}public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {TaskStateManager taskStateManager = env.getTaskStateManager();LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);LatencyTrackingStateConfig latencyTrackingStateConfig =latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build();return new HeapKeyedStateBackendBuilder<>( //这里是HeapKeyedStateBackendBuilderkvStateRegistry,keySerializer,env.getUserCodeClassLoader().asClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,latencyTrackingStateConfig,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),localRecoveryConfig,priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();}@Overridepublic OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier,@Nonnull Collection<OperatorStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry)throws BackendBuildingException {return new DefaultOperatorStateBackendBuilder(  //这里是DefaultOperatorStateBackendBuilderenv.getUserCodeClassLoader().asClassLoader(),env.getExecutionConfig(),isUsingAsynchronousSnapshots(),stateHandles,cancelStreamRegistry).build();}
}

memory backend

  • MemoryStateBackend中createOperatorStateBackend是创建了DefaultOperatorStateBackend
  • MemoryStateBackend中createKeyedStateBackend是创建了HeapKeyedStateBackendBackend
  • 最终调用了HeapMapState::Create创建state
AbstractFileStateBackend
MemoryStateBackend
ConfigurableStateBackend
AbstractStateBackend
CheckpointStorage
StateBackend

flink checkpoint

CheckpointStorage
+resolveCheckpoint(String externalPointer)
+createCheckpointStorage(JobID jobId)
RocksDBStateBackend
+checkpointStreamBackend : StateBackend
CheckpointStorageAccess
AbstractFsCheckpointStorageAccess
FsCheckpointStorageAccess
MemoryBackendCheckpointStorageAccess
RestoreOperation
RocksDBRestoreOperation
RocksDBFullRestoreOperation
RocksDBHeapTimersFullRestoreOperation
RocksDBIncrementalRestoreOperation
RocksDBSnapshotOperation
RocksDBIncrementalSnapshotOperation
RocksDBNativeFullSnapshotOperation

参考资料

https://www.jianshu.com/p/569a7e67c1b3
https://blog.csdn.net/u010942041/article/details/114944767
https://cloud.tencent.com/developer/article/1792720
https://blog.51cto.com/dataclub/5351042
https://www.cnblogs.com/lighten/p/13234350.html
https://cloud.tencent.com/developer/article/1765572
https://blog.csdn.net/m0_63475429/article/details/127417649
https://blog.csdn.net/Direction_Wind/article/details/125646616

http://www.lryc.cn/news/26449.html

相关文章:

  • 和年薪30W的阿里测开工程师聊过后,才知道我的工作就是打杂的...
  • C#开发的OpenRA的界面布局数据加载
  • 并查集结构
  • 全国CSM敏捷教练认证将于2023年3月25-26开班,报名从速!
  • JavaEE进阶第六课:SpringBoot ⽇志⽂件
  • 外置MOS管平均电流型LED降压恒流驱动器
  • python+pytest接口自动化(6)-请求参数格式的确定
  • 开发手册——一、编程规约_3.代码格式
  • 十七、Django-restframework之序列化器(二)
  • python GUI图形化编程-----wxpython
  • 【Python 】yyyy-MM-dd HH:mm:ss 时间格式 时间戳 全面解读超详细
  • 【C++】C++11 异常
  • 关于Thread.start()后的困惑、imap
  • qml学习之qwidget与qml结合使用并调用信号槽交互
  • 【 华为OD机试 2023】 组装新的数组(C++ Java JavaScript Python)
  • 【洛谷 P2089】烤鸡(循环枚举)
  • windows10安装ubantu双系统
  • 【华为OD机试 2023】 人数最多的站点/小火车最多人时所在园区站点(C++ Java JavaScript Python)
  • 2024届暑期实习实录(阿里云大数据研发平台)
  • 空口协议probe req和probe rsp 、auth req和auth rsp 、assoc req和assoc rsp讲解
  • vscode ssh一直卡在wget的解决方案
  • 【Python学习笔记】第二十五节 Python MySQL
  • 折叠屏手机自带的屏幕表面层为什么不能自己撕?
  • 20.hadoop系列之Yarn资源调度器
  • 206页16万字城市运行“一网统管”体系建设项目需求报告
  • 【JS】数组Array的使用
  • 2023年,软件测试怎么样?
  • 【学习笔记】NOMURA Programming Competition 2020
  • iis下常用程序的伪静态规则列表(包括wordpress、thinkphp)
  • 【Python语言基础】——Python Select From