当前位置: 首页 > news >正文

Flink 状态管理设计详解:StateBackend、State、RocksDB和Namespace

为什么需要 StateBackend?—— 职责分离原则

我们可以用一个银行的例子来类比:

  • State (如 ValueStateListState) 就像是你的银行卡
  • AbstractKeyedStateBackend 就像是银行的整个后台系统(包括总服务器、数据库、风控系统、会计系统等)。

你不能直接用一张塑料卡片去操作你的钱,你需要把卡片插入 ATM 机或交给柜员,由他们背后的银行系统来完成真正的存取款、转账等操作。

AbstractKeyedStateBackend 的存在正是为了实现这种职责分离

State 接口的职责(银行卡):

  • 定义用户交互的契约:提供一组简单、清晰的 API 给用户使用,比如 value()update()add()clear()。它只关心“做什么”,不关心“怎么做”。

AbstractKeyedStateBackend 的职责(银行系统):

它是一个庞大而复杂的“状态引擎”,负责所有底层的、与具体实现相关的脏活累活。

  • 生命周期管理:负责所有状态的创建、初始化和销毁 (dispose)。
  • 持久化与容错(核心):实现快照 (snapshot) 和恢复逻辑。这是 Flink 实现 Exactly-Once 的基石。单个 State 对象自身无法完成复杂的分布式快照。
  • 物理存储交互:它才是真正与 RocksDB、堆内存(Heap)等物理存储打交道的组件。它管理着数据库连接、Column Family、读写选项等。
  • Key/Namespace 管理:管理 keySerializer,计算当前 key 属于哪个 Key Group (KeyGroupRangeAssignment),处理不同 namespace 下的状态隔离。
  • 中央缓存与优化:如您所见,它内部有 lastName 和 lastState 这样的缓存机制,用于优化对同一状态的连续访问。
  • 应用横切关注点(AOP):它是一个中心化的工厂,可以在创建 State 时,统一应用 TTL、Metrics 监控等功能。

看 getOrCreateKeyedState 这段代码,它完美地展示了 StateBackend 作为“工厂”和“管理者”的角色:

// ... existing code ...@Override@SuppressWarnings("unchecked")public <N, S extends State, V> S getOrCreateKeyedState(final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)throws Exception {
// ... existing code ...InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());if (kvState == null) {if (!stateDescriptor.isSerializerInitialized()) {stateDescriptor.initializeSerializerUnlessSet(executionConfig);}// 这里是关键:一个装饰器链条// Backend作为工厂,负责创建原始State,并用TTL、Metrics等功能进行包装kvState =MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled(TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, ttlTimeProvider),this,stateDescriptor,latencyTrackingStateConfig,sizeTrackingStateConfig);keyValueStatesByName.put(stateDescriptor.getName(), kvState);publishQueryableStateIfEnabled(stateDescriptor, kvState);}return (S) kvState;}
// ... existing code ...

结论:如果直接使用 State,就意味着每一个 State 对象都需要自己实现一套完整的快照、恢复、缓存、物理存储交互逻辑。这将导致代码极度冗余、混乱且难以维护。AbstractKeyedStateBackend 将这些公共的、复杂的底层逻辑全部收敛,使得 State 对象可以保持为一个轻量级的、只关注业务逻辑的句柄。

State 反过来引用 Backend,这并非传统意义上需要避免的耦合,而是一种委托(Delegation)。我们来梳理一下这个流程:

  1. 创建Backend 创建了一个具体的 State 实现类(比如 RocksDBValueState)。
  2. 持有引用:在创建 RocksDBValueState 时,Backend 会把自身的引用 (this) 传递给 RocksDBValueState 的构造函数。因此,这个 State 实例从诞生起就知道“是谁创造了我”、“我应该向谁汇报”。

比如update

    public void update(V value) throws IOException {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (RocksDBException e) {throw new IOException("Error while adding data to RocksDB", e);}}

虽然直接调用了 backend.db.put(...),但我们仔细分析一下它所需要的所有参数,就会发现委托模式的本质依然存在:

  • columnFamily: 这个 ColumnFamilyHandle 是从哪里来的?它是在 RocksDBValueState 被创建时,由 backend 传入的。State 自己不管理 Column Family 的生命周期。
  • writeOptions: 这个 WriteOptions 对象同样是 backend 的成员变量,由 backend 统一配置和管理。
  • serializeCurrentKeyWithGroupAndNamespace(): 这是最关键的一步。这个方法内部需要:
    • backend.getCurrentKey(): 获取当前正在处理的 Key。
    • backend.getCurrentKeyGroupIndex(): 计算 Key Group。
    • getNamespaceSerializer(): 获取 Namespace 序列化器。
    • backend.getKeySerializer(): 获取 Key 序列化器。 这些核心的上下文信息和组件(序列化器),全部是由 backend 提供的。State 对象本身是无状态的(stateless in terms of context),它不知道当前在为哪个 key 工作,必须向 backend 查询。
  • serializeValue(value): 这个方法内部需要 getValueSerializer(),而这个序列化器也是在创建时由 backend 提供的。

所以,即使 State 执行了最后那一下 put 操作,它也像一个“一线工人”,虽然亲手把螺丝拧上去了,但这个螺丝(value)、螺丝刀(writeOptions)、图纸(columnFamily)以及拧在哪个位置(key 和 namespace),全部是由 backend 这个“车间主任”提供的。

这是一种更细粒度的委托:State 被委托了“如何将序列化好的 key 和 value 放入指定的 Column Family”这个具体的执行逻辑,但它依然将“获取所有执行前提条件(上下文、资源、配置)”这项更重要的职责委托给了 backend。

Subtask、RocksDB 实例、窗口和 Namespace 的关系

Operator 的一个 Subtask 实例 对应一个独立的 RocksDB 实例。

让我们把这个关系链梳理清楚:

  • 一个 Flink Job:可以包含多个 Operator(mapfilterkeyBy 等)。
  • 一个 Operator:可以有多个并行的 Subtask 实例(并行度决定)。
  • 一个 Subtask 实例:运行在一个 TaskManager 的一个 Slot 中。
  • 一个 RocksDBKeyedStateBackend 实例:每个有状态的 Subtask 实例都会创建一个自己的 RocksDBKeyedStateBackend 对象。
  • 一个 RocksDB 数据库实例:每个 RocksDBKeyedStateBackend 都会在 TaskManager 的本地磁盘上创建一个独立的 RocksDB 数据库目录和实例(db 对象)。

所以,如果你的一个 window 操作的并行度是 10,那么就会有 10 个 Subtask,对应 10 个 RocksDBKeyedStateBackend 实例,进而在不同的 TaskManager 上创建 10 个独立的 RocksDB 数据库。它们之间物理隔离,互不干扰。

那么窗口和 Namespace 是什么关系?

在一个 Subtask 内部(也就是在一个 RocksDB 实例内部),Namespace 是用来在逻辑上区分不同窗口的状态的

mergeNamespaces 就是最好的例子。当会话窗口需要合并时:

  • source namespaces 就是旧的、待合并的窗口的标识符。
  • target namespace 就是合并后的新窗口的标识符。

这些 namespace 和用户的 key 组合在一起,构成了 RocksDB 中真正的 key。

总结

  • 物理隔离:不同的 Subtask 通过拥有各自独立的 RocksDB 实例来实现物理隔离。
  • 逻辑隔离:在同一个 Subtask(同一个 RocksDB 实例)内部,不同的窗口(或其它需要隔离的场景,如 ProcessFunction 中的不同 Timer)通过 namespace 来实现逻辑隔离。

所有 State 如何共享 DB 并互相区分?—— Column Family

在一个 RocksDBKeyedStateBackend 内部,所有不同名称的 State(比如你在一个 ProcessFunction 中定义了 ValueState<Integer>ListState<String> 和 MapState<Long, Double>)是共享同一个 RocksDB 实例的。

那它们的数据是如何区分,不会混在一起的呢?答案是:列族(Column Family)

Column Family 是 RocksDB 中用于隔离数据的逻辑命名空间,可以把它想象成关系型数据库中的一张张独立的表。

我们来看 RocksDBKeyedStateBackend.java 中的关键实现:

当一个 State 首次被创建时,RocksDBKeyedStateBackend 会为它做两件事:

  1. 创建一个新的 Column Family:每个 StateDescriptor 的唯一名称(stateDesc.getName())会被用来命名一个新的 Column Family。
  2. 注册元信息:将这个 State 的名称、序列化器信息以及它对应的 ColumnFamilyHandle 存储在一个 Map 中,也就是 kvStateInformation
// ... existing code .../*** Information about the k/v states, maintained in the order as we create them. This is used to* retrieve the column family that is used for a state and also for sanity checks when* restoring.*/private final LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
// ... existing code ...

当具体的 State 对象(如 RocksDBValueState)执行读写操作时,它会从 backend 获取自己专属的 ColumnFamilyHandle,并将其作为参数传递给 db.get()db.put() 或 db.merge() 等方法。

// RocksDBValueState.java 中的 value() 方法
byte[] valueBytes = backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());

这样一来,虽然所有的 State 都在同一个 db 对象上操作,但由于它们使用了不同的 columnFamily,数据就被天然地隔离在了不同的“表”里,绝不会互相干扰。

这种设计的优势是什么?

  • 资源共享:所有 Column Family 共享同一个 MemTable、Write-Ahead-Log (WAL)、Block Cache 等核心 RocksDB 资源。这大大减少了内存开销和管理成本,而不是为每个 State 都启动一个完整的 DB 实例。
  • 原子写入:可以通过 WriteBatch 实现跨多个 Column Family 的原子写入,这对于保证 Flink 复杂操作的原子性至关重要。
  • 统一快照:可以对整个 RocksDB 实例(包含所有 Column Family)进行一次统一的、物理一致性的快照,极大地简化了 Checkpoint 的实现。

ColumnFamilyDescriptor

ColumnFamilyDescriptor 是 RocksDB Java API 的一部分,它本质上是一个列族(Column Family)的描述符,包含了创建列族所需的名称配置选项 (ColumnFamilyOptions)。

在 Flink 中,ColumnFamilyDescriptor 的构建主要通过 RocksDBOperationUtils.createColumnFamilyDescriptor 这个静态方法来完成。

我们来看一下这个方法的实现:

RocksDBOperationUtils.java

// ... existing code ...public static ColumnFamilyDescriptor createColumnFamilyDescriptor(RegisteredStateMetaInfoBase metaInfoBase,Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,@Nullable Long writeBufferManagerCapacity) {byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),"The chosen state name 'default' collides with the name of the default column family!");ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);}if (writeBufferManagerCapacity != null) {// It'd be great to perform the check earlier, e.g. when creating write buffer manager.// Unfortunately the check needs write buffer size that was just calculated.sanityCheckArenaBlockSize(options.writeBufferSize(),options.arenaBlockSize(),writeBufferManagerCapacity);}return new ColumnFamilyDescriptor(nameBytes, options);}
// ... existing code ...

从代码中我们可以清晰地看到构建 ColumnFamilyDescriptor 的步骤:

  1. 获取列族名称:

    • 从传入的 RegisteredStateMetaInfoBase 对象中获取 State 的名称 (metaInfoBase.getName())。
    • 将这个名称转换为字节数组 (byte[] nameBytes)。这是因为 RocksDB 的原生 API 使用字节数组来标识列族。
    • 这里有一个检查,确保 State 的名称不是 "default",以避免与 RocksDB 的默认列族冲突。
  2. 创建列族配置 (ColumnFamilyOptions):

    • 调用 createColumnFamilyOptions 方法,这个方法会使用 columnFamilyOptionsFactory 来生成一个 ColumnFamilyOptions 实例。
    • 这个 columnFamilyOptionsFactory 正是我们在 EmbeddedRocksDBStateBackend 中看到的那个函数:stateName -> resourceContainer.getColumnOptions()。它为每个 State 提供了基础的列族配置。
  3. (可选)配置 TTL 压缩过滤器:

    • 如果 ttlCompactFiltersManager 不为 null,会检查当前 State 是否配置了 TTL(Time-to-Live,生存时间)。
    • 如果配置了 TTL,它会为这个列族的 ColumnFamilyOptions 设置一个特定的压缩过滤器(Compaction Filter),这个过滤器会在 RocksDB 的后台压缩过程中自动清理过期的数据。
  4. (可选)内存检查:

    • 如果传入了 writeBufferManagerCapacity,会进行一个健全性检查,确保 arenaBlockSize 的配置是合理的。
  5. 实例化 ColumnFamilyDescriptor:

    • 最后,使用前面准备好的列族名称字节数组配置好的 ColumnFamilyOptions 对象,通过 new ColumnFamilyDescriptor(nameBytes, options) 来创建一个新的 ColumnFamilyDescriptor 实例并返回。

综上所述,一个 ColumnFamilyDescriptor 对象主要包含以下两个核心信息:

  1. 列族名称 (Column Family Name):

    • 以 byte[] 数组的形式存储。
    • 这个名称直接来源于 Flink State 的 StateDescriptor 中定义的名字。例如,new ValueStateDescriptor<>("my-state", String.class) 中的 "my-state"
  2. 列族选项 (Column Family Options):

    • 一个 org.rocksdb.ColumnFamilyOptions 对象。
    • 这个对象包含了该列族所有详细的配置参数,例如:
      • Merge Operator: 用于处理 ListState、AggregatingState 等需要合并操作的状态。
      • Write Buffer Size: 写缓冲区大小。
      • Compression Type: 压缩算法(如 Snappy, LZ4)。
      • Compaction Filter: 压缩过滤器,如用于实现 TTL 的过滤器。
      • 以及其他大量控制 RocksDB 行为的底层参数。

这个 ColumnFamilyDescriptor 对象随后会被传递给 db.createColumnFamily() 方法,RocksDB 会根据其中的名称和配置信息,在数据库实例中创建一个新的、隔离的列族。

Namespace 可以自己随意指定类型和值吗?

答案是:是的,几乎可以。

  1. 任意的类型 (Type): 在 Flink 的状态接口定义中,命名空间(Namespace)是一个泛型参数 N,例如 InternalKvState<K, N, V>。这意味着理论上你可以使用任何 Java/Scala 类型作为 Namespace 的类型,比如 StringLongInteger,甚至是自定义的 POJO 对象。

    唯一的硬性要求是:Flink 必须知道如何序列化和反序列化你的 Namespace 类型。你需要为该类型提供一个 TypeSerializer<N>。对于 LongString 等基础类型,Flink 会自动推断并使用内置的序列化器。对于自定义的 POJO,你需要确保它符合 Flink 的 POJO 规范,或者手动创建一个 TypeSerializer

  2. 任意的值 (Value): 一旦确定了 Namespace 的类型,你就可以在代码中通过调用 state.setCurrentNamespace(namespace) 来传入该类型的任意实例作为当前操作的命名空间。

    这正是你之前实现的 NamespacedStateListView 的核心思想:将用户的 UID(无论是 Long 还是 String 类型)作为 namespace 的值,从而为每个 UID 创建了一个逻辑上独立的 ListState

    AbstractRocksDBState.java 文件中的 setCurrentNamespace 方法就证明了这一点:

    // ... existing code ...
    /** The current namespace, which the next value methods will refer to. */
    private N currentNamespace;
    // ... existing code ...
    @Override
    public void setCurrentNamespace(N namespace) {this.currentNamespace = namespace;
    }
    // ... existing code ...
    

    这个方法简单地将传入的 namespace 对象赋值给内部的 currentNamespace 字段,后续的状态操作(如 getaddclear)都会使用这个字段来构建最终存储到 RocksDB 的 key。

和 KeyedStream,Window 的关系

这里需要分开来看:

与 KeyedStream 的关系:强依赖关系
  • 必须在 KeyedStream 上使用:所有带 Namespace 的状态(InternalKvState)都属于Keyed State。你必须先通过 dataStream.keyBy(...) 将数据流转换成 KeyedStream,然后才能在下游的算子(如 ProcessFunction)中使用这些状态。
  • 原因:Flink 的状态是根据 keyBy 指定的 Key (K) 来进行分区和管理的。Namespace (N) 只是在某个特定 Key (K) 的状态内部做的进一步划分。可以理解为一种二级索引或子分区。没有 keyBy 提供的一级分区,Namespace 就无从谈起。
与 Window 的关系:没有必然关系,窗口是 Namespace 的一种应用场景
  • 窗口是 Namespace 的使用者,而非前提:可以把窗口(Window)看作是 Flink 框架自身对 Namespace 机制的一种自动化应用。当你使用窗口操作时(例如 .window(TumblingEventTimeWindows.of(...))),Flink 会自动地:

    1. 为每一个窗口实例(比如 [00:00:05, 00:00:10) 这个时间窗口)创建一个 TimeWindow 对象。
    2. 在处理属于该窗口的数据时,自动调用 setCurrentNamespace(),并将这个 TimeWindow 对象作为 namespace 传入。 这样,窗口内的所有状态计算就被天然地隔离在了这个 TimeWindow 命名空间下。
  • 可以完全脱离窗口使用 Namespace:在一个普通的 KeyedProcessFunction 中,完全没有使用任何窗口操作,而是通过手动调用 setCurrentNamespace(uid) 来实现了自定义的状态划分。这赋予了超越窗口框架的、更细粒度的状态管理能力。

概念

与 Namespace 的关系

解释

​KeyedStream​

​强依赖​

必须先 keyBy得到 KeyedStreamNamespace是在 Key内部的二级分区。

​Window​

​无直接依赖​

窗口是 Namespace机制的一个高级应用。你可以用窗口,也可以不用窗口,直接在 ProcessFunction中手动控制 Namespace

所以,可以自定义 Namespace 的类型和值,并且这个机制可以独立于 Flink 的窗口(Window)功能来使用,只要你的操作是建立在 KeyedStream 之上即可。

window设置NameSpace

负责调用 setCurrentNamespace 的是窗口算子(WindowOperatorWindowOperator 的工作流程是:

  1. 接收到一条数据。
  2. 调用 WindowAssigner 的 assignWindows 方法,获取这条数据所属的窗口列表。
  3. 遍历这个窗口列表。
  4. 对于列表中的每一个窗口,先调用 state.setCurrentNamespace(window) 将当前状态的上下文切换到这个窗口。
  5. 然后,再对该窗口的状态进行更新(比如累加、添加元素等)。

我们可以从工程代码中找到清晰的证据:

在 WindowOperator.java 中,你可以看到这个完整的逻辑:

// ... existing code ...public void processElement(StreamRecord<RowData> record) throws Exception {
// ... existing code ...timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone);// 1. 调用 assigner 获取窗口列表// the windows which the input row should be placed intoCollection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);boolean isElementDropped = true;// 2. 遍历窗口列表for (W window : affectedWindows) {isElementDropped = false;// 3. 为每个窗口设置 NamespacewindowState.setCurrentNamespace(window);// 4. 更新状态RowData acc = windowState.value();if (acc == null) {acc = windowAggregator.createAccumulators();}windowAggregator.setAccumulators(window, acc);if (RowDataUtil.isAccumulateMsg(inputRow)) {windowAggregator.accumulate(inputRow);} else {windowAggregator.retract(inputRow);}acc = windowAggregator.getAccumulators();windowState.update(acc);}
// ... existing code ...

另一个例子在处理 Python UDAF 的算子中也可以看到同样的设计模式:

StreamArrowPythonGroupWindowAggregateFunctionOperator.java

// ... existing code ...public void bufferInput(RowData input) throws Exception {
// ... existing code ...// 1. 调用 assigner 获取窗口列表// Given the timestamp and element, returns the set of windows into which it// should be placed.elementWindows = windowAssigner.assignWindows(input, timestamp);// 2. 遍历窗口列表for (W window : elementWindows) {if (RowDataUtil.isAccumulateMsg(input)) {// 3. 为每个窗口设置 NamespacewindowAccumulateData.setCurrentNamespace(window);// 4. 更新状态windowAccumulateData.add(input);} else {windowRetractData.setCurrentNamespace(window);windowRetractData.add(input);}}}
// ... existing code ...

http://www.lryc.cn/news/596803.html

相关文章:

  • 【笔记】Handy Multi-Agent Tutorial 第三章: CAMEL框架简介及实践(实践部分)
  • Redis原理之分布式锁
  • PowerShell自动化核对AD与HR系统账户信息实战指南
  • IDEA202403 超好用设置【持续更新】
  • ZooKeeper在Hadoop中的协同应用:从NameNode选主到分布式锁实现
  • 天津大学陈亚楠教授团队 ACS AEM:焦耳热超快合成非平衡态能源材料——毫秒级制备与跨体系性能突破
  • 昨天去看了电科金仓的发布会,有点东西!
  • 从 Linux 将文件下载到 Windows 的几种实用方法
  • 【AI智能体】Dify 开发与集成MCP服务实战操作详解
  • 嵌入式学习之路
  • Python笔记之跨文件实例化、跨文件调用、导入库
  • 为什么本地ip记录成0.0.0.1
  • 基于Python flask的常用AI工具功能数据分析与可视化系统设计与实现,技术包括LSTM、SVM、朴素贝叶斯三种算法,echart可视化
  • 慢 SQL接口性能优化实战
  • Fast Frequency Estimation Algorithm by Least Squares Phase Unwrapping
  • USB4.0:开启高速数据传输的新时代
  • 当if else比较多时候应该怎么避免?
  • MCP与企业数据集成:ERP、CRM、数据仓库的统一接入
  • #Linux权限管理:从“Permission denied“到系统安全大师
  • uniapp自定义圆形勾选框和全选框
  • iOS 抓包工具有哪些?2025实用指南与场景推荐
  • 重磅发布:Oracle ADG 一键自动化搭建脚本
  • 离线快速处理PDF格式转化的方案
  • 揭秘ThreadLocal核心原理与应用
  • Linux文件系统理解1
  • NLP自然语言处理的一些疑点整理
  • vue2的scoped 原理
  • 基于SpringBoot+MyBatis+MySQL+VUE实现的实习管理系统(附源码+数据库+毕业论文+项目部署视频教程+项目所需软件工具)
  • Python通关秘籍(五)数据结构——元组
  • linux 驱动 - v4l2 驱动框架