双流join 、 Paimon Partial Update 和 动态schema
背景
Paimon 通过其独特的 partial-update
合并引擎和底层的 LSM 存储结构,巧妙地将传统双流 Join 中对 Flink State 的高频随机读/写,转换为了对 Paimon 表的顺序写和后台的高效合并,从而一站式地解决了 Flink 作业状态过大、依赖外部 KV 系统等一系列痛点。
传统方案中,Flink 作业需要维护一个巨大的 State(可能达到 TB 级)来存储其中一个流的数据。当另一个流的数据到达时,需要去这个巨大的 State 中查找(Join)对应的记录。这个“查找”操作,在数据量巨大、内存无法完全容纳时,就会频繁触发对磁盘的随机读。机械硬盘和固态硬盘的随机读性能远低于顺序读,这成为了整个作业的性能瓶瓶颈,并导致了高昂的资源开销和不稳定性。
使用 Paimon 的 partial-update
模式后,整个数据处理的范式发生了改变:
- 不再需要 Flink State 来做 Join:两个数据流不再需要在 Flink 算子内部进行 Join。它们各自独立地、源源不断地将自己的数据写入(
INSERT INTO
)到同一个 Paimon 表中。 - 写入是高效的顺序操作:Paimon 底层采用 LSM-Tree 结构。新写入的数据会先进入内存缓冲区,然后刷写成新的、有序的小文件。这个过程主要是顺序写,效率非常高。
这样一来,原来 Flink 作业中最消耗性能的“状态查找”(随机读)环节,被彻底消除了。
现在,两个流的数据都以部分列的形式写入了 Paimon 表。那么,数据是在哪里“打宽”合并的呢?答案是在 Paimon 的Compaction过程中。
-
Partial-Update 合并引擎:当将表的合并引擎设置为
partial-update
时,Paimon 就知道了它的合并策略。 正如文档/docs/content/primary-key-table/merge-engine/partial-update.md
中描述的,对于相同主键的多条记录,它会取每个字段最新的非空值,合并成一条完整的记录。假设 Paimon 收到三条记录:
<1, 23.0, 10, NULL>
<1, NULL, NULL, 'This is a book'>
<1, 25.2, NULL, NULL>
假设第一列是主键,最终合并的结果将是
<1, 25.2, 10, 'This is a book'>
。 -
LSM-Tree 与顺序读合并:Paimon 的 Compaction 任务会定期将小的、分层的文件合并成更大的文件。这个合并过程是读取多个有序的文件,然后进行多路归并排序,这基本上是顺序读操作,效率远高于随机读。
PartialUpdateMergeFunction
这个类就是实现该合并逻辑的核心。 -
Paimon Compaction策略见:Paimon LSM Tree Compaction 策略
// ... existing code ... public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {// ... existing code ...private InternalRow currentKey;private long latestSequenceNumber;private GenericRow row;private KeyValue reused;private boolean currentDeleteRow;// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false; // ... existing code ...
-
专用 Compaction 作业:为了不影响数据写入的实时性,最佳实践是启动一个独立的、专用的 Compaction 作业。这样,数据写入和数据合并就可以完全解耦,互不干扰。
如文档
/docs/content/maintenance/dedicated-compaction.md
所述,当有多个流式作业写入一个partial-update
表时,推荐使用专用的 Compaction 作业。<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-{{< version >}}.jar \compact \--warehouse <warehouse-path> \--database <database-name> \ --table <table-name> \...
总结:Paimon 的核心优势
通过上述分析,我们可以清晰地看到 Paimon 在这个场景下的优势:
- 性能革命:将 Flink State 的随机读瓶颈,转变为 Paimon 的顺序写 + 后台顺序读合并,大幅提升了整体吞吐量和性能。
- 架构简化与成本降低:不再需要维护外部的 HBase/Pegasus 等 KV 系统,所有数据统一存储在 Paimon 中,降低了系统复杂度和运维、存储成本。
- 稳定性提升:Flink 作业本身变成了无状态或轻状态的写入任务,彻底告别了 TB 级的 State,使得作业的稳定性和恢复速度大大增强。
- 开发简化:原来需要手写复杂
DataStream
API 和Timer
才能实现的逻辑,现在只需要两个简单的INSERT INTO
SQL 语句即可完成,开发效率和代码可维护性显著提高。
PartialUpdateMergeFunction
这是在 Paimon 中实现 partial-update
(部分列更新) 合并引擎的核心类。它的主要职责是在 Compaction 过程中,将具有相同主键的多条记录(KeyValue
)合并成最终的一条记录。
PartialUpdateMergeFunction
实现了 MergeFunction<KeyValue>
接口。在 Paimon 的 LSM-Tree 存储模型中,当执行 Compaction 操作需要合并多个数据文件时,Paimon 会读取具有相同主键的一组 KeyValue
数据,然后交由一个 MergeFunction
实例来处理,计算出最终的结果。
PartialUpdateMergeFunction
的合并逻辑是:对于相同主键的记录,不断地用新的非空字段值去覆盖旧的字段值,最终得到一个“打宽”后的完整记录。 它还支持更复杂的场景,如基于序列号的更新、字段聚合和多种删除策略。
// ... existing code ...
import org.apache.paimon.mergetree.compact.MergeFunction;
// ... existing code ...
/*** A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update* non-null fields on merge.*/
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
// ... existing code ...
核心成员变量
这些变量定义了 PartialUpdateMergeFunction
的状态和配置,决定了其合并行为。
// ... existing code ...
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {public static final String SEQUENCE_GROUP = "sequence-group";private final InternalRow.FieldGetter[] getters; // 用于从 InternalRow 中获取字段值private final boolean ignoreDelete; // 是否忽略删除记录private final Map<Integer, FieldsComparator> fieldSeqComparators; // 字段序列号比较器,用于 sequence-groupprivate final boolean fieldSequenceEnabled; // 是否启用了 sequence-groupprivate final Map<Integer, FieldAggregator> fieldAggregators; // 字段聚合器private final boolean removeRecordOnDelete; // 收到 DELETE 记录时是否删除整行private final Set<Integer> sequenceGroupPartialDelete; // 收到 DELETE 记录时,根据 sequence-group 删除部分列private final boolean[] nullables; // 记录每个字段是否可为 nullprivate InternalRow currentKey; // 当前处理的主键private long latestSequenceNumber; // 见过的最新序列号private GenericRow row; // 合并过程中的结果行private KeyValue reused; // 用于复用的 KeyValue 对象,避免重复创建private boolean currentDeleteRow; // 标记当前行最终是否应被删除private boolean notNullColumnFilled;/*** If the first value is retract, and no insert record is received, the row kind should be* RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no* RowKind.INSERT value is received)*/private boolean meetInsert; // 是否遇到过 INSERT 类型的记录// ... existing code ...
- 配置类变量 (
ignoreDelete
,fieldSeqComparators
,fieldAggregators
等) 通常在Factory
中被初始化,它们在整个合并过程中保持不变。 - 状态类变量 (
currentKey
,row
,latestSequenceNumber
等) 会在每次reset()
时被重置,用于处理新的一组具有相同主键的记录。
add(KeyValue kv)
:合并逻辑的核心
这是最重要的方法,定义了单条 KeyValue
是如何被合并到当前结果 row
中的。
// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false;if (kv.valueKind().isRetract()) {if (!notNullColumnFilled) {initRow(row, kv.value());notNullColumnFilled = true;}// ... 删除逻辑处理 ...// ... existing code ...String msg =String.join("\n","By default, Partial update can not accept delete records,"+ " you can choose one of the following solutions:","1. Configure 'ignore-delete' to ignore delete records.","2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.","3. Configure 'sequence-group's to retract partial columns.");throw new IllegalArgumentException(msg);}latestSequenceNumber = kv.sequenceNumber();if (fieldSeqComparators.isEmpty()) {updateNonNullFields(kv);} else {updateWithSequenceGroup(kv);}meetInsert = true;notNullColumnFilled = true;}
// ... existing code ...
它的逻辑可以分为两大块:
A. 处理 retract
消息 (RowKind 为 DELETE
或 UPDATE_BEFORE
)
partial-update
默认不接受删除记录。如果收到了,行为由配置决定:
ignoreDelete = true
: 直接忽略这条删除记录,返回。removeRecordOnDelete = true
: 当收到DELETE
类型的记录时,将currentDeleteRow
标记为true
,并清空当前row
。这意味着最终这条主键对应的记录将被删除。fieldSequenceEnabled = true
: 启用了sequence-group
。这是最复杂的逻辑,它会调用retractWithSequenceGroup(kv)
。这个方法会根据序列号比较结果,来决定是否要“撤销”某些字段的更新(通常是将其设置为null
或调用聚合器的retract
方法)。- 默认行为: 如果以上配置都没有,则直接抛出
IllegalArgumentException
异常,提示用户如何正确配置。
B. 处理 add
消息 (RowKind 为 INSERT
或 UPDATE_AFTER
)
这是主要的更新逻辑:
-
简单更新 (
updateNonNullFields
): 如果没有配置sequence-group
(fieldSeqComparators
为空),则执行最简单的部分列更新。遍历新纪录kv
的所有字段,只要字段值不为null
,就用它来更新row
中对应位置的值。// ... existing code ... private void updateNonNullFields(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());if (field != null) {row.setField(i, field);} else { // ... existing code ...
-
带序列号的更新 (
updateWithSequenceGroup
): 如果配置了sequence-group
,逻辑会更复杂。对于每个字段:- 如果该字段不属于任何
sequence-group
,则行为和简单更新类似(但会考虑聚合)。 - 如果该字段属于某个
sequence-group
,则会使用FieldsComparator
比较新记录kv
和当前结果row
的序列号字段。只有当新记录的序列号 大于或等于 当前结果的序列号时,才会用新记录的字段值去更新row
中由该sequence-group
控制的所有字段。这保证了数据的更新顺序。
- 如果该字段不属于任何
updateWithSequenceGroup
这个方法是 partial-update
合并引擎处理带有 sequence-group
配置时的核心逻辑。当用户在表属性中定义了 fields.<seq_field>.sequence-group = <data_field1>,<data_field2>
这样的规则时,数据合并就不再是简单的“非空值覆盖”,而是需要根据 seq_field
的值来判断是否应该更新 data_field1
和 data_field2
。这解决了多流更新时可能出现的数据乱序覆盖问题。
updateWithSequenceGroup
方法通过引入FieldsComparator
,将简单的字段更新升级为基于序列号的条件更新。它精确地控制了哪些字段在何时可以被更新,从而保证了在多流并发写入场景下,即使数据存在一定程度的乱序,最终也能合并成正确的结果。这是 Paimonpartial-update
模式能够处理复杂更新场景的关键所在。
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {
// ... existing code ...
- 输入:
KeyValue kv
,代表一条新到达的、具有相同主键的记录。 - 目标: 遍历这条新记录
kv
的所有字段,并根据sequence-group
的规则,决定是否用kv
中的字段值来更新当前正在合并的结果行this.row
。
该方法的核心是一个 for
循环,它遍历了表中的每一个字段。
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {
// ... existing code ...
在循环内部,对每个字段的处理逻辑可以分为两种情况:
- 该字段不属于任何
sequence-group
。 - 该字段属于某个
sequence-group
。
让我们来详细看这两种情况。
1. 字段不属于任何 sequence-group
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());FieldsComparator seqComparator = fieldSeqComparators.get(i);FieldAggregator aggregator = fieldAggregators.get(i);Object accumulator = getters[i].getFieldOrNull(row);if (seqComparator == null) {if (aggregator != null) {row.setField(i, aggregator.agg(accumulator, field));} else if (field != null) {row.setField(i, field);}} else {
// ... existing code ...
- 判断条件:
seqComparator == null
。fieldSeqComparators
是一个Map<Integer, FieldsComparator>
,如果在里面找不到当前字段索引i
,就说明这个字段不受任何sequence-group
控制。 - 处理逻辑:
- 带聚合函数: 如果为该字段配置了聚合函数(
aggregator != null
),例如sum
、max
等,则调用aggregator.agg()
方法,将当前累加值accumulator
和新值field
进行聚合,并将结果写回row
。 - 不带聚合函数: 这是最简单的情况。如果新来的字段值
field
不为null
,就直接用它覆盖row
中的旧值。这和updateNonNullFields
的行为是一致的。
- 带聚合函数: 如果为该字段配置了聚合函数(
2. 字段属于某个 sequence-group
这是该方法最核心和复杂的部分。
// ... existing code ...} else {if (isEmptySequenceGroup(kv, seqComparator)) {// skip null sequence groupcontinue;}if (seqComparator.compare(kv.value(), row) >= 0) {int index = i;// Multiple sequence fields should be updated at once.if (Arrays.stream(seqComparator.compareFields()).anyMatch(seqIndex -> seqIndex == index)) {for (int fieldIndex : seqComparator.compareFields()) {row.setField(fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));}continue;}row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));} else if (aggregator != null) {row.setField(i, aggregator.aggReversed(accumulator, field));}}}}
// ... existing code ...
- 判断条件:
seqComparator != null
。 - 处理逻辑:
- 空序列组检查:
isEmptySequenceGroup(kv, seqComparator)
会检查这条新纪录kv
中,其对应的序列号字段是否都为null
。如果是,意味着这条记录无法判断新旧,因此直接跳过,不进行任何更新。 - 序列号比较:
seqComparator.compare(kv.value(), row) >= 0
是关键。它会比较新记录kv
和当前结果row
中,由seqComparator
定义的序列号字段。- 如果新记录的序列号 >= 当前结果的序列号: 这意味着新记录
kv
是“更新”的或者“同样新”的,此时应该用kv
的值去更新row
。- 更新序列号字段本身: 如果当前字段
i
就是序列号字段之一,那么需要把这个sequence-group
定义的所有序列号字段都一次性更新掉,然后用continue
跳出本次循环。这是为了保证序列号字段之间的一致性。 - 更新数据字段: 如果当前字段
i
是被序列号控制的数据字段,则执行更新。如果有聚合器,则调用aggregator.agg()
;如果没有,则直接用新值field
覆盖。
- 更新序列号字段本身: 如果当前字段
- 如果新记录的序列号 < 当前结果的序列号: 这意味着
kv
是一条“旧”数据。在大部分情况下,这条旧数据会被忽略。但有一个例外:如果为该字段配置了支持乱序聚合的聚合器(例如sum
),则会调用aggregator.aggReversed()
。这个方法通常和agg()
的逻辑是一样的,它允许旧数据也能被正确地聚合进来。对于不支持乱序的聚合器(如max
),aggReversed
可能就是一个空操作。
- 如果新记录的序列号 >= 当前结果的序列号: 这意味着新记录
- 空序列组检查:
getResult()
方法:产出最终结果
当处理完具有相同主键的所有 KeyValue
后,调用此方法来获取最终的合并结果。
// ... existing code ...@Overridepublic KeyValue getResult() {if (reused == null) {reused = new KeyValue();}RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT;return reused.replace(currentKey, latestSequenceNumber, rowKind, row);}
// ... existing code ...
它会根据 currentDeleteRow
和 meetInsert
标志位来决定最终的 RowKind
。如果 currentDeleteRow
为 true
,或者整个合并过程从未见过 INSERT
类型的记录,那么最终结果就是一条 DELETE
记录。否则,就是一条 INSERT
记录。然后将主键、最新的序列号、最终的 RowKind
和合并后的 row
数据打包成一个 KeyValue
返回。
Factory
内部类:配置的入口
PartialUpdateMergeFunction.Factory
是一个非常重要的内部类,它负责解析用户在表上设置的 OPTIONS
,并据此创建出一个配置好的 PartialUpdateMergeFunction
实例。
// ... existing code ...public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {return new Factory(options, rowType, primaryKeys);}private static class Factory implements MergeFunctionFactory<KeyValue> {// ... 成员变量,用于存储从 Options 解析出的配置 ...private Factory(Options options, RowType rowType, List<String> primaryKeys) {this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);// ... existing code ...this.removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);// ... 解析 sequence-group 配置 ...for (Map.Entry<String, String> entry : options.toMap().entrySet()) {String k = entry.getKey();String v = entry.getValue();if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {// ... 解析出序列号字段和被控制的字段,构建 fieldSeqComparators ...}}// ... 解析聚合函数配置,构建 fieldAggregators ...this.fieldAggregators =createFieldAggregators(rowType, primaryKeys, allSequenceFields, new CoreOptions(options));// ... 配置校验,确保冲突的配置不会同时开启 ...Preconditions.checkState(!(removeRecordOnDelete && ignoreDelete),// ...);// ...}
// ... existing code ...
在构造函数中,它会:
- 读取
ignore-delete
,partial-update.remove-record-on-delete
等简单配置。 - 遍历所有
OPTIONS
,查找以fields.
开头、以.sequence-group
结尾的配置项,例如fields.order_time.sequence-group=order_id,price
。它会解析这些配置,构建出fieldSeqComparators
这个 Map,其中 key 是被控制字段的索引,value 是一个能够比较order_time
字段的比较器。 - 调用
createFieldAggregators
方法,解析fields.*.aggregate-function
等配置,构建出fieldAggregators
这个 Map。 - 执行一系列
Preconditions.checkState
,对用户的配置进行合法性校验,防止出现逻辑冲突。
总结
PartialUpdateMergeFunction
是 Paimon 实现高性能数据打宽(部分列更新)能力的技术基石。它通过一个设计精巧的合并流程,将简单的非空字段覆盖、基于序列号的有序更新、字段聚合以及多种删除策略融为一体。其 Factory
类则充当了连接用户配置和底层实现的桥梁。理解了这个类的工作原理,就能深刻地理解 Paimon partial-update
模式的强大之处。
双流拼接 怎么处理schema
Paimon 允许在写入数据时自动合并和演进表结构。这对于像双流 Join 结果写入等 schema 可能变化的场景至关重要。这个功能主要通过 write.merge-schema
选项来开启。
当将数据写入 Paimon 表时:
- 如果
write.merge-schema
设置为true
,Paimon 会比较写入数据(Source)的 schema 和目标表(Sink)当前的 schema。 - 如果发现写入数据中包含了表中不存在的新列,Paimon 会自动将这些新列添加到表结构中,生成一个新的、版本更高的 schema。
- 对于数据中缺失但在表 schema 中存在的列,Paimon 会自动填充
null
值。
这个过程是原子性的,并记录在表的元数据中。Paimon 会为每一次 schema 变更创建一个新的版本化的 schema 文件。
代码参考:
在 Spark 中,写入逻辑由 WriteIntoPaimonTable.scala
处理。可以看到,当 mergeSchema
为 true
时,它会调用 mergeAndCommitSchema
来合并 schema,并处理列不匹配的情况。
WriteIntoPaimonTable.scala
// ... existing code ...override def run(sparkSession: SparkSession): Seq[Row] = {var data = _dataif (mergeSchema) {val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)mergeAndCommitSchema(dataSchema, allowExplicitCast)// For case that some columns is absent in data, we still allow to write once write.merge-schema is true.val newTableSchema = SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {val resolve = sparkSession.sessionState.conf.resolverval cols = newTableSchema.map {field =>dataSchema.find(f => resolve(f.name, field.name)) match {case Some(f) => col(f.name)case _ => lit(null).as(field.name)}}data = data.select(cols: _*)}}
// ... existing code ...
一个具体的测试用例也展示了这一点,一个原先只有 a
和 b
列的表,成功写入了包含 c
和 d
列的新数据。
DataFrameWriteTest.scala
// ... existing code ...// Case 1: two additional fields: DoubleType and TimestampTypeval ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts)).toDF("a", "b", "c", "d")df2.write.format("paimon").mode("append").option("write.merge-schema", "true").save(location)
// ... existing code ...
在 Flink 或 Spark 中进行双流 Join 时,Paimon 通常作为 Sink 端。Join 操作本身由计算引擎完成。应用需要做的就是:
- 执行双流 Join。
- 将 Join 后的
DataStream
或DataFrame
写入 Paimon 表。 - 在写入时,设置
write.merge-schema
为true
。
这样,无论 Join 结果的 schema 如何(比如因为上游流增加了字段导致 Join 结果也增加了字段),Paimon 表都可以自动适应,动态地添加新列。
SchemaMergingUtils
SchemaMergingUtils
是 Paimon schema 演进(Schema Evolution)功能的核心工具类。它的主要职责是比较两个 schema(通常是数据表的现有 schema 和新写入数据的 schema),并根据预设的规则将它们合并成一个新的、统一的 schema。这个过程支持添加新列、安全地转换现有列的数据类型,从而实现动态 schema 的能力。
当配置 Paimon 表允许 schema 合并(例如通过 write.merge-schema=true
)时,写入流程就会调用这个工具类。它会:
- 比较字段:找出新旧 schema 中同名和新增的字段。
- 合并类型:对于同名字段,尝试合并其数据类型(例如,
INT
可以演进为BIGINT
)。 - 添加字段:将新 schema 中独有的字段添加到最终的 schema 中,并为其分配新的唯一 ID。
- 生成新版 Schema:如果发生了任何变更,它会创建一个版本号加一的新的
TableSchema
对象。
下面我们结合代码,从顶层方法到底层实现,一步步进行分析。
mergeSchemas
这是最顶层的入口方法,用于合并一个完整的表 schema 和一个新的行类型(通常来自要写入的数据)。
- 参数:
currentTableSchema
: Paimon 表当前的TableSchema
对象。它包含了字段、分区键、主键、表配置等所有元数据。targetType
: 目标RowType
,即新数据的 schema。allowExplicitCast
: 一个布尔标志,决定是否允许显式(可能存在精度损失)的类型转换,比如STRING
转INT
。
- 逻辑:
- 首先,它会检查
targetType
和currentTableSchema
的RowType
是否完全相同。如果相同,则无需合并,直接返回当前的TableSchema
。 - 如果不同,它会初始化一个
AtomicInteger
类型的highestFieldId
,记录当前 schema 中所有字段(包括嵌套字段)的最大 ID。这个 ID 对于为新字段分配唯一标识至关重要。 - 调用重载的
mergeSchemas
方法(最终调用核心的merge
方法)来递归地合并两个RowType
。 - 如果合并后的
newRowType
与原始的currentType
相同(例如,只是可空性变化,而合并逻辑会保留原始的可空性),则也认为没有发生实质性变化,返回原始的TableSchema
。 - 如果 schema 确实发生了变化,它会创建一个新的
TableSchema
实例。这个新 schema 的 ID 会在旧 ID 的基础上加 1,字段列表和highestFieldId
会更新,而分区键、主键、表配置和注释等信息则会从旧 schema 中继承。
- 首先,它会检查
// ... existing code ...public static TableSchema mergeSchemas(TableSchema currentTableSchema, RowType targetType, boolean allowExplicitCast) {RowType currentType = currentTableSchema.logicalRowType();if (currentType.equals(targetType)) {return currentTableSchema;}AtomicInteger highestFieldId = new AtomicInteger(currentTableSchema.highestFieldId());RowType newRowType =mergeSchemas(currentType, targetType, highestFieldId, allowExplicitCast);if (newRowType.equals(currentType)) {// It happens if the `targetType` only changes `nullability` but we always respect the// current's.return currentTableSchema;}return new TableSchema(currentTableSchema.id() + 1,newRowType.getFields(),highestFieldId.get(),currentTableSchema.partitionKeys(),currentTableSchema.primaryKeys(),currentTableSchema.options(),currentTableSchema.comment());}
// ... existing code ...
merge
这是所有合并逻辑的核心。它被递归调用以处理各种数据类型。
可空性处理 (Nullability Handling)
在方法的一开始,它将 base0
和 update0
的可空性都设置为 true
来进行比较。最终返回的类型的可空性将以 base0
(原始表 schema 中的类型)为准。这意味着 schema 合并不会改变现有列的可空性。
// ... existing code ...public static DataType merge(DataType base0,DataType update0,AtomicInteger highestFieldId,boolean allowExplicitCast) {// Here we try to merge the base0 and update0 without regard to the nullability,// and set the base0's nullability to the return's.DataType base = base0.copy(true);DataType update = update0.copy(true);if (base.equals(update)) {return base0;} else if (base instanceof RowType && update instanceof RowType) {
// ... existing code ...
递归合并复杂类型
RowType
(行类型): 这是最复杂的部分。- 合并现有字段: 遍历
base
(旧 schema) 的所有字段。对于每个字段,检查update
(新 schema) 中是否存在同名字段。如果存在,就递归调用merge
方法来合并这两个字段的类型。如果不存在,则保留base
中的原始字段。 - 添加新字段: 遍历
update
的所有字段,找出在base
中不存在的字段。这些就是需要新增的列。对于每个新字段,调用assignIdForNewField
为其分配一个新的、唯一的字段 ID,然后将其添加到最终的字段列表中。 - 最后,用更新后的字段列表创建一个新的
RowType
。
- 合并现有字段: 遍历
// ... existing code ...} else if (base instanceof RowType && update instanceof RowType) {List<DataField> baseFields = ((RowType) base).getFields();List<DataField> updateFields = ((RowType) update).getFields();Map<String, DataField> updateFieldMap =updateFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));List<DataField> updatedFields =baseFields.stream().map(baseField -> {if (updateFieldMap.containsKey(baseField.name())) {DataField updateField =updateFieldMap.get(baseField.name());DataType updatedDataType =merge(baseField.type(),updateField.type(),highestFieldId,allowExplicitCast);return new DataField(baseField.id(),baseField.name(),updatedDataType,baseField.description());} else {return baseField;}}).collect(Collectors.toList());Map<String, DataField> baseFieldMap =baseFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));List<DataField> newFields =updateFields.stream().filter(field -> !baseFieldMap.containsKey(field.name())).map(field -> assignIdForNewField(field, highestFieldId)).map(field -> field.copy(true)).collect(Collectors.toList());updatedFields.addAll(newFields);return new RowType(base0.isNullable(), updatedFields);} else if (base instanceof MapType && update instanceof MapType) {
// ... existing code ...
MapType
,ArrayType
,MultisetType
: 对于这些集合类型,合并逻辑很简单:递归地调用merge
方法来合并它们的内部元素类型(MapType
的键和值类型,ArrayType
和MultisetType
的元素类型)。
合并基础类型
-
DecimalType
: 这是一个特例。只有当两个DecimalType
的scale
(小数位数) 相同时,才能合并。合并后的precision
(总位数) 取两者中的最大值。如果scale
不同,会直接抛出UnsupportedOperationException
。 -
其他可转换类型: 对于其他基础类型,通过
supportsDataTypesCast
方法判断是否可以转换。- 隐式转换 (Implicit Cast): 当
allowExplicitCast
为false
时,只允许安全的类型提升,例如INT
->BIGINT
,FLOAT
->DOUBLE
。 - 显式转换 (Explicit Cast): 当
allowExplicitCast
为true
时,允许更多可能损失精度的转换。 - 对于带有长度(如
VARCHAR
)或精度(如TIMESTAMP
)的类型,通常要求新类型的长度/精度不能小于旧类型,除非开启了显式转换。 - 如果可以转换,则直接采用
update
的类型,但保留base0
的可空性。
- 隐式转换 (Implicit Cast): 当
// ... existing code ...} else if (supportsDataTypesCast(base, update, allowExplicitCast)) {if (DataTypes.getLength(base).isPresent() && DataTypes.getLength(update).isPresent()) {// this will check and merge types which has a `length` attribute, like BinaryType,// CharType, VarBinaryType, VarCharType.if (allowExplicitCast|| DataTypes.getLength(base).getAsInt()<= DataTypes.getLength(update).getAsInt()) {return update.copy(base0.isNullable());} else {throw new UnsupportedOperationException(String.format("Failed to merge the target type that has a smaller length: %s and %s",base, update));}} else if (DataTypes.getPrecision(base).isPresent()&& DataTypes.getPrecision(update).isPresent()) {// this will check and merge types which has a `precision` attribute, like// LocalZonedTimestampType, TimeType, TimestampType.if (allowExplicitCast|| DataTypes.getPrecision(base).getAsInt()<= DataTypes.getPrecision(update).getAsInt()) {return update.copy(base0.isNullable());} else {throw new UnsupportedOperationException(String.format("Failed to merge the target type that has a lower precision: %s and %s",base, update));}} else {return update.copy(base0.isNullable());}} else {throw new UnsupportedOperationException(String.format("Failed to merge data types %s and %s", base, update));}}
// ... existing code ...
assignIdForNewField
这个方法非常重要。当向 RowType
中添加一个新字段时,它负责为这个新字段及其所有嵌套字段(如果是复杂类型)分配唯一的 ID。它通过传入的 AtomicInteger highestFieldId
来实现 ID 的原子性递增,确保了在并发场景下 ID 的唯一性,这对于 Paimon 正确地按 ID 映射和读取列数据至关重要。
// ... existing code ...private static DataField assignIdForNewField(DataField field, AtomicInteger highestFieldId) {DataType dataType = ReassignFieldId.reassign(field.type(), highestFieldId);return new DataField(highestFieldId.incrementAndGet(), field.name(), dataType, field.description());}
}
总结
SchemaMergingUtils
通过一套定义明确且可递归的规则,实现了 Paimon 强大而灵活的 Schema 演进能力。它能够智能地处理字段的增加和类型变化,同时通过严格的 ID 分配和管理,保证了数据读写的正确性。这个类是 Paimon 能够适应动态数据源、支持平滑表结构变更的关键所在。
SchemaManager
SchemaManager
是 Paimon 中负责管理表 schema(模式)的核心组件。它处理所有与 schema 相关的持久化操作,包括创建、读取、更新和版本管理。可以把它看作是 Paimon 表 schema 在文件系统中的“数据库管理员”。
SchemaManager
的主要职责可以归纳为以下几点:
- Schema 持久化:将
TableSchema
对象序列化为 JSON 文件,并存储在表的schema
目录下。每个 schema 文件代表一个版本。 - 版本管理:每个 schema 文件名都以
schema-
开头,后跟一个从 0 开始递增的版本号(ID),例如schema-0
,schema-1
等。这使得 Paimon 可以追踪 schema 的所有历史变更。 - Schema 读取:提供方法来读取最新版本的 schema、特定版本的 schema 或所有版本的 schema。
- Schema 创建:在创建新表时,负责初始化并提交第一个 schema 版本(
schema-0
)。 - Schema 变更:通过应用一系列
SchemaChange
(如添加列、删除列、修改表选项等)来原子性地更新 schema,并生成一个新的、版本号加一的 schema 文件。 - 多分支支持:能够为不同的数据分支(branch)管理各自独立的 schema 演进路径。
结构和关键属性
// ... existing code ...
@ThreadSafe
public class SchemaManager implements Serializable {private static final String SCHEMA_PREFIX = "schema-";private final FileIO fileIO;private final Path tableRoot;private final String branch;public SchemaManager(FileIO fileIO, Path tableRoot) {
// ... existing code ...
@ThreadSafe
: 这个注解表明该类的设计是线程安全的,允许多个线程同时访问一个SchemaManager
实例。SCHEMA_PREFIX
: 常量"schema-"
,定义了 schema 文件名的前缀。fileIO
:FileIO
接口的实例,用于与底层文件系统(如 HDFS, S3, 本地文件系统)进行交互。tableRoot
:Path
对象,指向表的根目录。SchemaManager
会在这个目录下的schema
子目录中工作。branch
: 字符串,表示当前SchemaManager
实例操作的数据分支名称。Paimon 支持类似 Git 的分支功能,main
是默认的主分支。不同的分支可以有独立的快照和 schema 演进。
构造函数和分支管理
// ... existing code ...public SchemaManager(FileIO fileIO, Path tableRoot) {this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH);}/** Specify the default branch for data writing. */public SchemaManager(FileIO fileIO, Path tableRoot, String branch) {this.fileIO = fileIO;this.tableRoot = tableRoot;this.branch = BranchManager.normalizeBranch(branch);}public SchemaManager copyWithBranch(String branchName) {return new SchemaManager(fileIO, tableRoot, branchName);}
// ... existing code ...
- 构造函数初始化了
fileIO
、tableRoot
和branch
。默认使用主分支DEFAULT_MAIN_BRANCH
。 copyWithBranch(String branchName)
: 这是一个工厂方法,用于创建一个新的SchemaManager
实例来操作指定的分支。这体现了 Paimon 对多分支的支持。
Schema 读取方法
// ... existing code ...public Optional<TableSchema> latest() {try {return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX).reduce(Math::max).map(this::schema);} catch (IOException e) {throw new UncheckedIOException(e);}}
// ... existing code ...public List<TableSchema> listAll() {return listAllIds().stream().map(this::schema).collect(Collectors.toList());}public List<Long> listAllIds() {try {return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX).collect(Collectors.toList());} catch (IOException e) {throw new UncheckedIOException(e);}}
// ... existing code ...
latest()
: 获取最新版本的TableSchema
。它通过listVersionedFiles
工具方法列出schema
目录下所有符合schema-*
格式的文件,提取出版本号,找到最大的版本号,然后调用schema(long id)
方法读取并反序列化对应的 schema 文件。listAll()
: 获取所有版本的TableSchema
列表。listAllIds()
: 仅获取所有 schema 版本的 ID 列表。schema(long id)
(未在片段中完全展示,但被latest()
调用): 这是一个内部方法,根据给定的 ID 构建 schema 文件路径(如.../schema/schema-5
),然后使用fileIO
读取文件内容,并通过TableSchema.fromJSON(String json)
将其反序列化为TableSchema
对象。
表创建 createTable(...)
// ... existing code ...public TableSchema createTable(Schema schema, boolean externalTable) throws Exception {while (true) {Optional<TableSchema> latest = latest();if (latest.isPresent()) {TableSchema latestSchema = latest.get();if (externalTable) {checkSchemaForExternalTable(latestSchema.toSchema(), schema);return latestSchema;} else {throw new IllegalStateException("Schema in filesystem exists, creation is not allowed.");}}TableSchema newSchema = TableSchema.create(0, schema);// validate table from creating tableFileStoreTableFactory.create(fileIO, tableRoot, newSchema).store();boolean success = commit(newSchema);if (success) {return newSchema;}}}
// ... existing code ...
- 这是一个原子性操作,通过
while(true)
循环和文件系统的原子性创建来保证。 - 检查存在性: 首先调用
latest()
检查是否已有 schema 文件存在。如果存在且不是创建外部表,则抛出异常,防止覆盖现有表。 - 创建新 Schema: 如果不存在,则使用
TableSchema.create(0, schema)
创建一个 ID 为 0 的新TableSchema
。 - 验证: 调用
FileStoreTableFactory.create(...)
来验证 schema 的有效性(例如,检查主键、分区键等配置是否合法)。 - 提交: 调用
commit(newSchema)
方法,该方法会尝试原子性地创建schema-0
文件。如果创建成功,循环结束并返回新的TableSchema
。如果因为并发冲突导致创建失败,循环会继续,重新尝试整个过程。
Schema 变更 commitChanges(...)
这是执行 ALTER TABLE
操作的核心逻辑。
// ... existing code ...public TableSchema commitChanges(List<SchemaChange> changes)throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,Catalog.ColumnNotExistException {SnapshotManager snapshotManager =new SnapshotManager(fileIO, tableRoot, branch, null, null);LazyField<Boolean> hasSnapshots =new LazyField<>(() -> snapshotManager.latestSnapshot() != null);while (true) {TableSchema oldTableSchema =latest().orElseThrow(() ->new Catalog.TableNotExistException(identifierFromPath(tableRoot.toString(), true, branch)));TableSchema newTableSchema = generateTableSchema(oldTableSchema, changes, hasSnapshots);try {boolean success = commit(newTableSchema);if (success) {return newTableSchema;}} catch (Exception e) {throw new RuntimeException(e);}}}public boolean commit(TableSchema newSchema) throws Exception {SchemaValidation.validateTableSchema(newSchema);SchemaValidation.validateFallbackBranch(this, newSchema);Path schemaPath = toSchemaPath(newSchema.id());return fileIO.tryToWriteAtomic(schemaPath, newSchema.toString());}
// ... existing code ...
- 同样使用
while(true)
循环来保证原子性。 - 获取旧 Schema: 首先获取当前的最新 schema (
oldTableSchema
)。 - 生成新 Schema: 调用
generateTableSchema
方法,该方法是变更逻辑的核心。它接收旧 schema 和一个SchemaChange
列表,然后逐个应用这些变更(如AddColumn
,DropColumn
,SetOption
等),生成一个新的TableSchema
对象。这个新对象的 ID 是旧 ID 加 1。 - 提交新 Schema: 调用
commit(newTableSchema)
尝试原子性地创建新的 schema 文件(如schema-5
->schema-6
)。如果成功,则返回新 schema。如果失败,则重试。
generateTableSchema(...)
这个方法是应用 SchemaChange
的具体实现。它像一个状态机,基于 oldTableSchema
,根据 changes
列表中的每个变更项,逐步构建出 newTableSchema
的各个部分。
// ... existing code ...public TableSchema generateTableSchema(TableSchema oldTableSchema, List<SchemaChange> changes, LazyField<Boolean> hasSnapshots)throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {Map<String, String> oldOptions = new HashMap<>(oldTableSchema.options());Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());String newComment = oldTableSchema.comment();for (SchemaChange change : changes) {if (change instanceof SetOption) {
// ... existing code ...} else if (change instanceof RemoveOption) {
// ... existing code ...} else if (change instanceof AddColumn) {
// ... existing code ...} else if (change instanceof RenameColumn) {
// ... existing code ...} else if (change instanceof DropColumn) {
// ... existing code ...} else if (change instanceof UpdateColumnType) {
// ... existing code ...} else if (change instanceof UpdateColumnNullability) {
// ... existing code ...} else if (change instanceof UpdateColumnPosition) {
// ... existing code ...} else if (change instanceof UpdateColumnComment) {
// ... existing code ...}}
// ... existing code ...
它通过 instanceof
判断 SchemaChange
的具体类型,并执行相应的逻辑:
SetOption
/RemoveOption
: 修改newOptions
这个 Map。AddColumn
: 向newFields
列表中添加新字段,并使用highestFieldId
分配新 ID。RenameColumn
: 修改newFields
中某个字段的名称。DropColumn
: 从newFields
中移除字段。UpdateColumnType
/UpdateColumnNullability
: 更新字段的类型或可空性。- ...等等。
总结
SchemaManager
是 Paimon 表结构管理的基石。它通过将 schema 版本化并持久化到文件系统中,实现了 schema 的可靠追踪和演进。其原子性的提交操作(无论是创建还是变更)确保了在并发环境下的元数据一致性。它与 SchemaMergingUtils
(负责逻辑合并)和 SchemaChange
(负责定义变更操作)等类紧密协作,共同构成了 Paimon 强大而灵活的 Schema Evolution 机制。