Flink 状态管理设计详解:StateBackend、State、RocksDB和Namespace
为什么需要 StateBackend
?—— 职责分离原则
我们可以用一个银行的例子来类比:
State
(如ValueState
,ListState
) 就像是你的银行卡。AbstractKeyedStateBackend
就像是银行的整个后台系统(包括总服务器、数据库、风控系统、会计系统等)。
你不能直接用一张塑料卡片去操作你的钱,你需要把卡片插入 ATM 机或交给柜员,由他们背后的银行系统来完成真正的存取款、转账等操作。
AbstractKeyedStateBackend
的存在正是为了实现这种职责分离:
State
接口的职责(银行卡):
- 定义用户交互的契约:提供一组简单、清晰的 API 给用户使用,比如
value()
,update()
,add()
,clear()
。它只关心“做什么”,不关心“怎么做”。
AbstractKeyedStateBackend
的职责(银行系统):
它是一个庞大而复杂的“状态引擎”,负责所有底层的、与具体实现相关的脏活累活。
- 生命周期管理:负责所有状态的创建、初始化和销毁 (
dispose
)。 - 持久化与容错(核心):实现快照 (
snapshot
) 和恢复逻辑。这是 Flink 实现 Exactly-Once 的基石。单个State
对象自身无法完成复杂的分布式快照。 - 物理存储交互:它才是真正与 RocksDB、堆内存(Heap)等物理存储打交道的组件。它管理着数据库连接、Column Family、读写选项等。
- Key/Namespace 管理:管理
keySerializer
,计算当前 key 属于哪个 Key Group (KeyGroupRangeAssignment
),处理不同namespace
下的状态隔离。 - 中央缓存与优化:如您所见,它内部有
lastName
和lastState
这样的缓存机制,用于优化对同一状态的连续访问。 - 应用横切关注点(AOP):它是一个中心化的工厂,可以在创建
State
时,统一应用 TTL、Metrics 监控等功能。
看 getOrCreateKeyedState
这段代码,它完美地展示了 StateBackend
作为“工厂”和“管理者”的角色:
// ... existing code ...@Override@SuppressWarnings("unchecked")public <N, S extends State, V> S getOrCreateKeyedState(final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)throws Exception {
// ... existing code ...InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());if (kvState == null) {if (!stateDescriptor.isSerializerInitialized()) {stateDescriptor.initializeSerializerUnlessSet(executionConfig);}// 这里是关键:一个装饰器链条// Backend作为工厂,负责创建原始State,并用TTL、Metrics等功能进行包装kvState =MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled(TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, ttlTimeProvider),this,stateDescriptor,latencyTrackingStateConfig,sizeTrackingStateConfig);keyValueStatesByName.put(stateDescriptor.getName(), kvState);publishQueryableStateIfEnabled(stateDescriptor, kvState);}return (S) kvState;}
// ... existing code ...
结论:如果直接使用 State
,就意味着每一个 State
对象都需要自己实现一套完整的快照、恢复、缓存、物理存储交互逻辑。这将导致代码极度冗余、混乱且难以维护。AbstractKeyedStateBackend
将这些公共的、复杂的底层逻辑全部收敛,使得 State
对象可以保持为一个轻量级的、只关注业务逻辑的句柄。
State
反过来引用 Backend
,这并非传统意义上需要避免的耦合,而是一种委托(Delegation)。我们来梳理一下这个流程:
- 创建:
Backend
创建了一个具体的State
实现类(比如RocksDBValueState
)。 - 持有引用:在创建
RocksDBValueState
时,Backend
会把自身的引用 (this
) 传递给RocksDBValueState
的构造函数。因此,这个State
实例从诞生起就知道“是谁创造了我”、“我应该向谁汇报”。
比如update
public void update(V value) throws IOException {if (value == null) {clear();return;}try {backend.db.put(columnFamily,writeOptions,serializeCurrentKeyWithGroupAndNamespace(),serializeValue(value));} catch (RocksDBException e) {throw new IOException("Error while adding data to RocksDB", e);}}
虽然直接调用了 backend.db.put(...),但我们仔细分析一下它所需要的所有参数,就会发现委托模式的本质依然存在:
- columnFamily: 这个 ColumnFamilyHandle 是从哪里来的?它是在 RocksDBValueState 被创建时,由 backend 传入的。State 自己不管理 Column Family 的生命周期。
- writeOptions: 这个 WriteOptions 对象同样是 backend 的成员变量,由 backend 统一配置和管理。
- serializeCurrentKeyWithGroupAndNamespace(): 这是最关键的一步。这个方法内部需要:
- backend.getCurrentKey(): 获取当前正在处理的 Key。
- backend.getCurrentKeyGroupIndex(): 计算 Key Group。
- getNamespaceSerializer(): 获取 Namespace 序列化器。
- backend.getKeySerializer(): 获取 Key 序列化器。 这些核心的上下文信息和组件(序列化器),全部是由 backend 提供的。State 对象本身是无状态的(stateless in terms of context),它不知道当前在为哪个 key 工作,必须向 backend 查询。
- serializeValue(value): 这个方法内部需要 getValueSerializer(),而这个序列化器也是在创建时由 backend 提供的。
所以,即使 State 执行了最后那一下 put 操作,它也像一个“一线工人”,虽然亲手把螺丝拧上去了,但这个螺丝(value)、螺丝刀(writeOptions)、图纸(columnFamily)以及拧在哪个位置(key 和 namespace),全部是由 backend 这个“车间主任”提供的。
这是一种更细粒度的委托:State 被委托了“如何将序列化好的 key 和 value 放入指定的 Column Family”这个具体的执行逻辑,但它依然将“获取所有执行前提条件(上下文、资源、配置)”这项更重要的职责委托给了 backend。
Subtask、RocksDB 实例、窗口和 Namespace 的关系
Operator 的一个 Subtask 实例 对应一个独立的 RocksDB 实例。
让我们把这个关系链梳理清楚:
- 一个 Flink Job:可以包含多个 Operator(
map
,filter
,keyBy
等)。 - 一个 Operator:可以有多个并行的 Subtask 实例(并行度决定)。
- 一个 Subtask 实例:运行在一个 TaskManager 的一个 Slot 中。
- 一个
RocksDBKeyedStateBackend
实例:每个有状态的 Subtask 实例都会创建一个自己的RocksDBKeyedStateBackend
对象。 - 一个 RocksDB 数据库实例:每个
RocksDBKeyedStateBackend
都会在 TaskManager 的本地磁盘上创建一个独立的 RocksDB 数据库目录和实例(db
对象)。
所以,如果你的一个 window
操作的并行度是 10,那么就会有 10 个 Subtask,对应 10 个 RocksDBKeyedStateBackend
实例,进而在不同的 TaskManager 上创建 10 个独立的 RocksDB 数据库。它们之间物理隔离,互不干扰。
那么窗口和 Namespace 是什么关系?
在一个 Subtask 内部(也就是在一个 RocksDB 实例内部),Namespace 是用来在逻辑上区分不同窗口的状态的。
mergeNamespaces
就是最好的例子。当会话窗口需要合并时:
source
namespaces 就是旧的、待合并的窗口的标识符。target
namespace 就是合并后的新窗口的标识符。
这些 namespace
和用户的 key
组合在一起,构成了 RocksDB 中真正的 key。
总结:
- 物理隔离:不同的 Subtask 通过拥有各自独立的 RocksDB 实例来实现物理隔离。
- 逻辑隔离:在同一个 Subtask(同一个 RocksDB 实例)内部,不同的窗口(或其它需要隔离的场景,如
ProcessFunction
中的不同 Timer)通过namespace
来实现逻辑隔离。
所有 State 如何共享 DB 并互相区分?—— Column Family
在一个 RocksDBKeyedStateBackend
内部,所有不同名称的 State
(比如你在一个 ProcessFunction
中定义了 ValueState<Integer>
、ListState<String>
和 MapState<Long, Double>
)是共享同一个 RocksDB
实例的。
那它们的数据是如何区分,不会混在一起的呢?答案是:列族(Column Family)。
Column Family 是 RocksDB 中用于隔离数据的逻辑命名空间,可以把它想象成关系型数据库中的一张张独立的表。
我们来看 RocksDBKeyedStateBackend.java
中的关键实现:
当一个 State
首次被创建时,RocksDBKeyedStateBackend
会为它做两件事:
- 创建一个新的 Column Family:每个
StateDescriptor
的唯一名称(stateDesc.getName()
)会被用来命名一个新的 Column Family。 - 注册元信息:将这个 State 的名称、序列化器信息以及它对应的
ColumnFamilyHandle
存储在一个Map
中,也就是kvStateInformation
。
// ... existing code .../*** Information about the k/v states, maintained in the order as we create them. This is used to* retrieve the column family that is used for a state and also for sanity checks when* restoring.*/private final LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
// ... existing code ...
当具体的 State
对象(如 RocksDBValueState
)执行读写操作时,它会从 backend
获取自己专属的 ColumnFamilyHandle
,并将其作为参数传递给 db.get()
、db.put()
或 db.merge()
等方法。
// RocksDBValueState.java 中的 value() 方法
byte[] valueBytes = backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());
这样一来,虽然所有的 State
都在同一个 db
对象上操作,但由于它们使用了不同的 columnFamily
,数据就被天然地隔离在了不同的“表”里,绝不会互相干扰。
这种设计的优势是什么?
- 资源共享:所有 Column Family 共享同一个 MemTable、Write-Ahead-Log (WAL)、Block Cache 等核心 RocksDB 资源。这大大减少了内存开销和管理成本,而不是为每个 State 都启动一个完整的 DB 实例。
- 原子写入:可以通过
WriteBatch
实现跨多个 Column Family 的原子写入,这对于保证 Flink 复杂操作的原子性至关重要。 - 统一快照:可以对整个 RocksDB 实例(包含所有 Column Family)进行一次统一的、物理一致性的快照,极大地简化了 Checkpoint 的实现。
ColumnFamilyDescriptor
ColumnFamilyDescriptor
是 RocksDB Java API 的一部分,它本质上是一个列族(Column Family)的描述符,包含了创建列族所需的名称和配置选项 (ColumnFamilyOptions
)。
在 Flink 中,ColumnFamilyDescriptor
的构建主要通过 RocksDBOperationUtils.createColumnFamilyDescriptor
这个静态方法来完成。
我们来看一下这个方法的实现:
RocksDBOperationUtils.java
// ... existing code ...public static ColumnFamilyDescriptor createColumnFamilyDescriptor(RegisteredStateMetaInfoBase metaInfoBase,Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,@Nullable Long writeBufferManagerCapacity) {byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),"The chosen state name 'default' collides with the name of the default column family!");ColumnFamilyOptions options =createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());if (ttlCompactFiltersManager != null) {ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);}if (writeBufferManagerCapacity != null) {// It'd be great to perform the check earlier, e.g. when creating write buffer manager.// Unfortunately the check needs write buffer size that was just calculated.sanityCheckArenaBlockSize(options.writeBufferSize(),options.arenaBlockSize(),writeBufferManagerCapacity);}return new ColumnFamilyDescriptor(nameBytes, options);}
// ... existing code ...
从代码中我们可以清晰地看到构建 ColumnFamilyDescriptor
的步骤:
获取列族名称:
- 从传入的
RegisteredStateMetaInfoBase
对象中获取 State 的名称 (metaInfoBase.getName()
)。 - 将这个名称转换为字节数组 (
byte[] nameBytes
)。这是因为 RocksDB 的原生 API 使用字节数组来标识列族。 - 这里有一个检查,确保 State 的名称不是 "default",以避免与 RocksDB 的默认列族冲突。
- 从传入的
创建列族配置 (
ColumnFamilyOptions
):- 调用
createColumnFamilyOptions
方法,这个方法会使用columnFamilyOptionsFactory
来生成一个ColumnFamilyOptions
实例。 - 这个
columnFamilyOptionsFactory
正是我们在EmbeddedRocksDBStateBackend
中看到的那个函数:stateName -> resourceContainer.getColumnOptions()
。它为每个 State 提供了基础的列族配置。
- 调用
(可选)配置 TTL 压缩过滤器:
- 如果
ttlCompactFiltersManager
不为 null,会检查当前 State 是否配置了 TTL(Time-to-Live,生存时间)。 - 如果配置了 TTL,它会为这个列族的
ColumnFamilyOptions
设置一个特定的压缩过滤器(Compaction Filter),这个过滤器会在 RocksDB 的后台压缩过程中自动清理过期的数据。
- 如果
(可选)内存检查:
- 如果传入了
writeBufferManagerCapacity
,会进行一个健全性检查,确保arenaBlockSize
的配置是合理的。
- 如果传入了
实例化
ColumnFamilyDescriptor
:- 最后,使用前面准备好的列族名称字节数组和配置好的
ColumnFamilyOptions
对象,通过new ColumnFamilyDescriptor(nameBytes, options)
来创建一个新的ColumnFamilyDescriptor
实例并返回。
- 最后,使用前面准备好的列族名称字节数组和配置好的
综上所述,一个 ColumnFamilyDescriptor
对象主要包含以下两个核心信息:
列族名称 (Column Family Name):
- 以
byte[]
数组的形式存储。 - 这个名称直接来源于 Flink State 的
StateDescriptor
中定义的名字。例如,new ValueStateDescriptor<>("my-state", String.class)
中的"my-state"
。
- 以
列族选项 (Column Family Options):
- 一个
org.rocksdb.ColumnFamilyOptions
对象。 - 这个对象包含了该列族所有详细的配置参数,例如:
- Merge Operator: 用于处理 ListState、AggregatingState 等需要合并操作的状态。
- Write Buffer Size: 写缓冲区大小。
- Compression Type: 压缩算法(如 Snappy, LZ4)。
- Compaction Filter: 压缩过滤器,如用于实现 TTL 的过滤器。
- 以及其他大量控制 RocksDB 行为的底层参数。
- 一个
这个 ColumnFamilyDescriptor
对象随后会被传递给 db.createColumnFamily()
方法,RocksDB 会根据其中的名称和配置信息,在数据库实例中创建一个新的、隔离的列族。
Namespace 可以自己随意指定类型和值吗?
答案是:是的,几乎可以。
任意的类型 (Type): 在 Flink 的状态接口定义中,命名空间(Namespace)是一个泛型参数
N
,例如InternalKvState<K, N, V>
。这意味着理论上你可以使用任何 Java/Scala 类型作为Namespace
的类型,比如String
,Long
,Integer
,甚至是自定义的 POJO 对象。唯一的硬性要求是:Flink 必须知道如何序列化和反序列化你的 Namespace 类型。你需要为该类型提供一个
TypeSerializer<N>
。对于Long
,String
等基础类型,Flink 会自动推断并使用内置的序列化器。对于自定义的 POJO,你需要确保它符合 Flink 的 POJO 规范,或者手动创建一个TypeSerializer
。任意的值 (Value): 一旦确定了
Namespace
的类型,你就可以在代码中通过调用state.setCurrentNamespace(namespace)
来传入该类型的任意实例作为当前操作的命名空间。这正是你之前实现的
NamespacedStateListView
的核心思想:将用户的UID
(无论是Long
还是String
类型)作为namespace
的值,从而为每个UID
创建了一个逻辑上独立的ListState
。AbstractRocksDBState.java
文件中的setCurrentNamespace
方法就证明了这一点:// ... existing code ... /** The current namespace, which the next value methods will refer to. */ private N currentNamespace; // ... existing code ... @Override public void setCurrentNamespace(N namespace) {this.currentNamespace = namespace; } // ... existing code ...
这个方法简单地将传入的
namespace
对象赋值给内部的currentNamespace
字段,后续的状态操作(如get
,add
,clear
)都会使用这个字段来构建最终存储到 RocksDB 的 key。
和 KeyedStream,Window 的关系
这里需要分开来看:
与 KeyedStream
的关系:强依赖关系
- 必须在
KeyedStream
上使用:所有带Namespace
的状态(InternalKvState
)都属于Keyed State。你必须先通过dataStream.keyBy(...)
将数据流转换成KeyedStream
,然后才能在下游的算子(如ProcessFunction
)中使用这些状态。 - 原因:Flink 的状态是根据
keyBy
指定的 Key (K
) 来进行分区和管理的。Namespace
(N
) 只是在某个特定 Key (K
) 的状态内部做的进一步划分。可以理解为一种二级索引或子分区。没有keyBy
提供的一级分区,Namespace
就无从谈起。
与 Window
的关系:没有必然关系,窗口是 Namespace 的一种应用场景
窗口是 Namespace 的使用者,而非前提:可以把窗口(Window)看作是 Flink 框架自身对
Namespace
机制的一种自动化应用。当你使用窗口操作时(例如.window(TumblingEventTimeWindows.of(...))
),Flink 会自动地:- 为每一个窗口实例(比如
[00:00:05, 00:00:10)
这个时间窗口)创建一个TimeWindow
对象。 - 在处理属于该窗口的数据时,自动调用
setCurrentNamespace()
,并将这个TimeWindow
对象作为namespace
传入。 这样,窗口内的所有状态计算就被天然地隔离在了这个TimeWindow
命名空间下。
- 为每一个窗口实例(比如
可以完全脱离窗口使用 Namespace:在一个普通的
KeyedProcessFunction
中,完全没有使用任何窗口操作,而是通过手动调用setCurrentNamespace(uid)
来实现了自定义的状态划分。这赋予了超越窗口框架的、更细粒度的状态管理能力。
概念 | 与 Namespace 的关系 | 解释 |
---|---|---|
KeyedStream | 强依赖 | 必须先 |
Window | 无直接依赖 | 窗口是 |
所以,可以自定义 Namespace
的类型和值,并且这个机制可以独立于 Flink 的窗口(Window)功能来使用,只要你的操作是建立在 KeyedStream
之上即可。
window设置NameSpace
负责调用 setCurrentNamespace
的是窗口算子(WindowOperator
)。WindowOperator
的工作流程是:
- 接收到一条数据。
- 调用
WindowAssigner
的assignWindows
方法,获取这条数据所属的窗口列表。 - 遍历这个窗口列表。
- 对于列表中的每一个窗口,先调用
state.setCurrentNamespace(window)
将当前状态的上下文切换到这个窗口。 - 然后,再对该窗口的状态进行更新(比如累加、添加元素等)。
我们可以从工程代码中找到清晰的证据:
在 WindowOperator.java
中,你可以看到这个完整的逻辑:
// ... existing code ...public void processElement(StreamRecord<RowData> record) throws Exception {
// ... existing code ...timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone);// 1. 调用 assigner 获取窗口列表// the windows which the input row should be placed intoCollection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);boolean isElementDropped = true;// 2. 遍历窗口列表for (W window : affectedWindows) {isElementDropped = false;// 3. 为每个窗口设置 NamespacewindowState.setCurrentNamespace(window);// 4. 更新状态RowData acc = windowState.value();if (acc == null) {acc = windowAggregator.createAccumulators();}windowAggregator.setAccumulators(window, acc);if (RowDataUtil.isAccumulateMsg(inputRow)) {windowAggregator.accumulate(inputRow);} else {windowAggregator.retract(inputRow);}acc = windowAggregator.getAccumulators();windowState.update(acc);}
// ... existing code ...
另一个例子在处理 Python UDAF 的算子中也可以看到同样的设计模式:
StreamArrowPythonGroupWindowAggregateFunctionOperator.java
// ... existing code ...public void bufferInput(RowData input) throws Exception {
// ... existing code ...// 1. 调用 assigner 获取窗口列表// Given the timestamp and element, returns the set of windows into which it// should be placed.elementWindows = windowAssigner.assignWindows(input, timestamp);// 2. 遍历窗口列表for (W window : elementWindows) {if (RowDataUtil.isAccumulateMsg(input)) {// 3. 为每个窗口设置 NamespacewindowAccumulateData.setCurrentNamespace(window);// 4. 更新状态windowAccumulateData.add(input);} else {windowRetractData.setCurrentNamespace(window);windowRetractData.add(input);}}}
// ... existing code ...