当前位置: 首页 > news >正文

双流join 、 Paimon Partial Update 和 动态schema

背景

Paimon 通过其独特的 partial-update 合并引擎和底层的 LSM 存储结构,巧妙地将传统双流 Join 中对 Flink State 的高频随机读/写,转换为了对 Paimon 表的顺序写和后台的高效合并,从而一站式地解决了 Flink 作业状态过大、依赖外部 KV 系统等一系列痛点。

传统方案中,Flink 作业需要维护一个巨大的 State(可能达到 TB 级)来存储其中一个流的数据。当另一个流的数据到达时,需要去这个巨大的 State 中查找(Join)对应的记录。这个“查找”操作,在数据量巨大、内存无法完全容纳时,就会频繁触发对磁盘的随机读。机械硬盘和固态硬盘的随机读性能远低于顺序读,这成为了整个作业的性能瓶瓶颈,并导致了高昂的资源开销和不稳定性。

使用 Paimon 的 partial-update 模式后,整个数据处理的范式发生了改变:

  • 不再需要 Flink State 来做 Join:两个数据流不再需要在 Flink 算子内部进行 Join。它们各自独立地、源源不断地将自己的数据写入(INSERT INTO)到同一个 Paimon 表中。
  • 写入是高效的顺序操作:Paimon 底层采用 LSM-Tree 结构。新写入的数据会先进入内存缓冲区,然后刷写成新的、有序的小文件。这个过程主要是顺序写,效率非常高。

这样一来,原来 Flink 作业中最消耗性能的“状态查找”(随机读)环节,被彻底消除了。

现在,两个流的数据都以部分列的形式写入了 Paimon 表。那么,数据是在哪里“打宽”合并的呢?答案是在 Paimon 的Compaction过程中。

  • Partial-Update 合并引擎:当将表的合并引擎设置为 partial-update 时,Paimon 就知道了它的合并策略。 正如文档 /docs/content/primary-key-table/merge-engine/partial-update.md 中描述的,对于相同主键的多条记录,它会取每个字段最新的非空值,合并成一条完整的记录。

    假设 Paimon 收到三条记录:

    • <1, 23.0, 10, NULL>
    • <1, NULL, NULL, 'This is a book'>
    • <1, 25.2, NULL, NULL>

    假设第一列是主键,最终合并的结果将是 <1, 25.2, 10, 'This is a book'>

  • LSM-Tree 与顺序读合并:Paimon 的 Compaction 任务会定期将小的、分层的文件合并成更大的文件。这个合并过程是读取多个有序的文件,然后进行多路归并排序,这基本上是顺序读操作,效率远高于随机读。PartialUpdateMergeFunction 这个类就是实现该合并逻辑的核心。

  • Paimon Compaction策略见:Paimon LSM Tree Compaction 策略

    // ... existing code ...
    public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {// ... existing code ...private InternalRow currentKey;private long latestSequenceNumber;private GenericRow row;private KeyValue reused;private boolean currentDeleteRow;// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false;
    // ... existing code ...
    
  • 专用 Compaction 作业:为了不影响数据写入的实时性,最佳实践是启动一个独立的、专用的 Compaction 作业。这样,数据写入和数据合并就可以完全解耦,互不干扰。

    如文档 /docs/content/maintenance/dedicated-compaction.md 所述,当有多个流式作业写入一个 partial-update 表时,推荐使用专用的 Compaction 作业。

    <FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-{{< version >}}.jar \compact \--warehouse <warehouse-path> \--database <database-name> \ --table <table-name> \...
    

总结:Paimon 的核心优势

通过上述分析,我们可以清晰地看到 Paimon 在这个场景下的优势:

  1. 性能革命:将 Flink State 的随机读瓶颈,转变为 Paimon 的顺序写 + 后台顺序读合并,大幅提升了整体吞吐量和性能。
  2. 架构简化与成本降低:不再需要维护外部的 HBase/Pegasus 等 KV 系统,所有数据统一存储在 Paimon 中,降低了系统复杂度和运维、存储成本。
  3. 稳定性提升:Flink 作业本身变成了无状态或轻状态的写入任务,彻底告别了 TB 级的 State,使得作业的稳定性和恢复速度大大增强。
  4. 开发简化:原来需要手写复杂 DataStream API 和 Timer 才能实现的逻辑,现在只需要两个简单的 INSERT INTO SQL 语句即可完成,开发效率和代码可维护性显著提高。

PartialUpdateMergeFunction

这是在 Paimon 中实现 partial-update (部分列更新) 合并引擎的核心类。它的主要职责是在 Compaction 过程中,将具有相同主键的多条记录(KeyValue)合并成最终的一条记录。

PartialUpdateMergeFunction 实现了 MergeFunction<KeyValue> 接口。在 Paimon 的 LSM-Tree 存储模型中,当执行 Compaction 操作需要合并多个数据文件时,Paimon 会读取具有相同主键的一组 KeyValue 数据,然后交由一个 MergeFunction 实例来处理,计算出最终的结果。

PartialUpdateMergeFunction 的合并逻辑是:对于相同主键的记录,不断地用新的非空字段值去覆盖旧的字段值,最终得到一个“打宽”后的完整记录。 它还支持更复杂的场景,如基于序列号的更新、字段聚合和多种删除策略。

// ... existing code ...
import org.apache.paimon.mergetree.compact.MergeFunction;
// ... existing code ...
/*** A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update* non-null fields on merge.*/
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
// ... existing code ...

核心成员变量

这些变量定义了 PartialUpdateMergeFunction 的状态和配置,决定了其合并行为。

// ... existing code ...
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {public static final String SEQUENCE_GROUP = "sequence-group";private final InternalRow.FieldGetter[] getters; // 用于从 InternalRow 中获取字段值private final boolean ignoreDelete; // 是否忽略删除记录private final Map<Integer, FieldsComparator> fieldSeqComparators; // 字段序列号比较器,用于 sequence-groupprivate final boolean fieldSequenceEnabled; // 是否启用了 sequence-groupprivate final Map<Integer, FieldAggregator> fieldAggregators; // 字段聚合器private final boolean removeRecordOnDelete; // 收到 DELETE 记录时是否删除整行private final Set<Integer> sequenceGroupPartialDelete; // 收到 DELETE 记录时,根据 sequence-group 删除部分列private final boolean[] nullables; // 记录每个字段是否可为 nullprivate InternalRow currentKey; // 当前处理的主键private long latestSequenceNumber; // 见过的最新序列号private GenericRow row; // 合并过程中的结果行private KeyValue reused; // 用于复用的 KeyValue 对象,避免重复创建private boolean currentDeleteRow; // 标记当前行最终是否应被删除private boolean notNullColumnFilled;/*** If the first value is retract, and no insert record is received, the row kind should be* RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no* RowKind.INSERT value is received)*/private boolean meetInsert; // 是否遇到过 INSERT 类型的记录// ... existing code ...
  • 配置类变量 (ignoreDeletefieldSeqComparatorsfieldAggregators 等) 通常在 Factory 中被初始化,它们在整个合并过程中保持不变。
  • 状态类变量 (currentKeyrowlatestSequenceNumber 等) 会在每次 reset() 时被重置,用于处理新的一组具有相同主键的记录。

add(KeyValue kv) :合并逻辑的核心

这是最重要的方法,定义了单条 KeyValue 是如何被合并到当前结果 row 中的。

// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false;if (kv.valueKind().isRetract()) {if (!notNullColumnFilled) {initRow(row, kv.value());notNullColumnFilled = true;}// ... 删除逻辑处理 ...// ... existing code ...String msg =String.join("\n","By default, Partial update can not accept delete records,"+ " you can choose one of the following solutions:","1. Configure 'ignore-delete' to ignore delete records.","2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.","3. Configure 'sequence-group's to retract partial columns.");throw new IllegalArgumentException(msg);}latestSequenceNumber = kv.sequenceNumber();if (fieldSeqComparators.isEmpty()) {updateNonNullFields(kv);} else {updateWithSequenceGroup(kv);}meetInsert = true;notNullColumnFilled = true;}
// ... existing code ...

它的逻辑可以分为两大块:

A. 处理 retract 消息 (RowKind 为 DELETE 或 UPDATE_BEFORE)

partial-update 默认不接受删除记录。如果收到了,行为由配置决定:

  1. ignoreDelete = true: 直接忽略这条删除记录,返回。
  2. removeRecordOnDelete = true: 当收到 DELETE 类型的记录时,将 currentDeleteRow 标记为 true,并清空当前 row。这意味着最终这条主键对应的记录将被删除。
  3. fieldSequenceEnabled = true: 启用了 sequence-group。这是最复杂的逻辑,它会调用 retractWithSequenceGroup(kv)。这个方法会根据序列号比较结果,来决定是否要“撤销”某些字段的更新(通常是将其设置为 null 或调用聚合器的 retract 方法)。
  4. 默认行为: 如果以上配置都没有,则直接抛出 IllegalArgumentException 异常,提示用户如何正确配置。

B. 处理 add 消息 (RowKind 为 INSERT 或 UPDATE_AFTER)

这是主要的更新逻辑:

  1. 简单更新 (updateNonNullFields): 如果没有配置 sequence-group (fieldSeqComparators 为空),则执行最简单的部分列更新。遍历新纪录 kv 的所有字段,只要字段值不为 null,就用它来更新 row 中对应位置的值。

    // ... existing code ...
    private void updateNonNullFields(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());if (field != null) {row.setField(i, field);} else {
    // ... existing code ...
    
  2. 带序列号的更新 (updateWithSequenceGroup): 如果配置了 sequence-group,逻辑会更复杂。对于每个字段:

    • 如果该字段不属于任何 sequence-group,则行为和简单更新类似(但会考虑聚合)。
    • 如果该字段属于某个 sequence-group,则会使用 FieldsComparator 比较新记录 kv 和当前结果 row 的序列号字段。只有当新记录的序列号 大于或等于 当前结果的序列号时,才会用新记录的字段值去更新 row 中由该 sequence-group 控制的所有字段。这保证了数据的更新顺序。

updateWithSequenceGroup

这个方法是 partial-update 合并引擎处理带有 sequence-group 配置时的核心逻辑。当用户在表属性中定义了 fields.<seq_field>.sequence-group = <data_field1>,<data_field2> 这样的规则时,数据合并就不再是简单的“非空值覆盖”,而是需要根据 seq_field 的值来判断是否应该更新 data_field1 和 data_field2。这解决了多流更新时可能出现的数据乱序覆盖问题。

updateWithSequenceGroup 方法通过引入 FieldsComparator,将简单的字段更新升级为基于序列号的条件更新。它精确地控制了哪些字段在何时可以被更新,从而保证了在多流并发写入场景下,即使数据存在一定程度的乱序,最终也能合并成正确的结果。这是 Paimon partial-update 模式能够处理复杂更新场景的关键所在。

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {
// ... existing code ...
  • 输入KeyValue kv,代表一条新到达的、具有相同主键的记录。
  • 目标: 遍历这条新记录 kv 的所有字段,并根据 sequence-group 的规则,决定是否用 kv 中的字段值来更新当前正在合并的结果行 this.row

该方法的核心是一个 for 循环,它遍历了表中的每一个字段。

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {
// ... existing code ...

在循环内部,对每个字段的处理逻辑可以分为两种情况:

  1. 该字段不属于任何 sequence-group
  2. 该字段属于某个 sequence-group

让我们来详细看这两种情况。

1. 字段不属于任何 sequence-group

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());FieldsComparator seqComparator = fieldSeqComparators.get(i);FieldAggregator aggregator = fieldAggregators.get(i);Object accumulator = getters[i].getFieldOrNull(row);if (seqComparator == null) {if (aggregator != null) {row.setField(i, aggregator.agg(accumulator, field));} else if (field != null) {row.setField(i, field);}} else {
// ... existing code ...
  • 判断条件seqComparator == nullfieldSeqComparators 是一个 Map<Integer, FieldsComparator>,如果在里面找不到当前字段索引 i,就说明这个字段不受任何 sequence-group 控制。
  • 处理逻辑:
    • 带聚合函数: 如果为该字段配置了聚合函数(aggregator != null),例如 summax 等,则调用 aggregator.agg() 方法,将当前累加值 accumulator 和新值 field 进行聚合,并将结果写回 row
    • 不带聚合函数: 这是最简单的情况。如果新来的字段值 field 不为 null,就直接用它覆盖 row 中的旧值。这和 updateNonNullFields 的行为是一致的。

2. 字段属于某个 sequence-group

这是该方法最核心和复杂的部分。

// ... existing code ...} else {if (isEmptySequenceGroup(kv, seqComparator)) {// skip null sequence groupcontinue;}if (seqComparator.compare(kv.value(), row) >= 0) {int index = i;// Multiple sequence fields should be updated at once.if (Arrays.stream(seqComparator.compareFields()).anyMatch(seqIndex -> seqIndex == index)) {for (int fieldIndex : seqComparator.compareFields()) {row.setField(fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));}continue;}row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));} else if (aggregator != null) {row.setField(i, aggregator.aggReversed(accumulator, field));}}}}
// ... existing code ...
  • 判断条件seqComparator != null
  • 处理逻辑:
    1. 空序列组检查isEmptySequenceGroup(kv, seqComparator) 会检查这条新纪录 kv 中,其对应的序列号字段是否都为 null。如果是,意味着这条记录无法判断新旧,因此直接跳过,不进行任何更新。
    2. 序列号比较seqComparator.compare(kv.value(), row) >= 0 是关键。它会比较新记录 kv 和当前结果 row 中,由 seqComparator 定义的序列号字段。
      • 如果新记录的序列号 >= 当前结果的序列号: 这意味着新记录 kv 是“更新”的或者“同样新”的,此时应该用 kv 的值去更新 row
        • 更新序列号字段本身: 如果当前字段 i 就是序列号字段之一,那么需要把这个 sequence-group 定义的所有序列号字段都一次性更新掉,然后用 continue 跳出本次循环。这是为了保证序列号字段之间的一致性。
        • 更新数据字段: 如果当前字段 i 是被序列号控制的数据字段,则执行更新。如果有聚合器,则调用 aggregator.agg();如果没有,则直接用新值 field 覆盖。
      • 如果新记录的序列号 < 当前结果的序列号: 这意味着 kv 是一条“旧”数据。在大部分情况下,这条旧数据会被忽略。但有一个例外:如果为该字段配置了支持乱序聚合的聚合器(例如 sum),则会调用 aggregator.aggReversed()。这个方法通常和 agg() 的逻辑是一样的,它允许旧数据也能被正确地聚合进来。对于不支持乱序的聚合器(如 max),aggReversed 可能就是一个空操作。

getResult() 方法:产出最终结果

当处理完具有相同主键的所有 KeyValue 后,调用此方法来获取最终的合并结果。

// ... existing code ...@Overridepublic KeyValue getResult() {if (reused == null) {reused = new KeyValue();}RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT;return reused.replace(currentKey, latestSequenceNumber, rowKind, row);}
// ... existing code ...

它会根据 currentDeleteRow 和 meetInsert 标志位来决定最终的 RowKind。如果 currentDeleteRow 为 true,或者整个合并过程从未见过 INSERT 类型的记录,那么最终结果就是一条 DELETE 记录。否则,就是一条 INSERT 记录。然后将主键、最新的序列号、最终的 RowKind 和合并后的 row 数据打包成一个 KeyValue 返回。

Factory 内部类:配置的入口

PartialUpdateMergeFunction.Factory 是一个非常重要的内部类,它负责解析用户在表上设置的 OPTIONS,并据此创建出一个配置好的 PartialUpdateMergeFunction 实例。

// ... existing code ...public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {return new Factory(options, rowType, primaryKeys);}private static class Factory implements MergeFunctionFactory<KeyValue> {// ... 成员变量,用于存储从 Options 解析出的配置 ...private Factory(Options options, RowType rowType, List<String> primaryKeys) {this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);// ... existing code ...this.removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);// ... 解析 sequence-group 配置 ...for (Map.Entry<String, String> entry : options.toMap().entrySet()) {String k = entry.getKey();String v = entry.getValue();if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {// ... 解析出序列号字段和被控制的字段,构建 fieldSeqComparators ...}}// ... 解析聚合函数配置,构建 fieldAggregators ...this.fieldAggregators =createFieldAggregators(rowType, primaryKeys, allSequenceFields, new CoreOptions(options));// ... 配置校验,确保冲突的配置不会同时开启 ...Preconditions.checkState(!(removeRecordOnDelete && ignoreDelete),// ...);// ...}
// ... existing code ...

在构造函数中,它会:

  1. 读取 ignore-deletepartial-update.remove-record-on-delete 等简单配置。
  2. 遍历所有 OPTIONS,查找以 fields. 开头、以 .sequence-group 结尾的配置项,例如 fields.order_time.sequence-group=order_id,price。它会解析这些配置,构建出 fieldSeqComparators 这个 Map,其中 key 是被控制字段的索引,value 是一个能够比较 order_time 字段的比较器。
  3. 调用 createFieldAggregators 方法,解析 fields.*.aggregate-function 等配置,构建出 fieldAggregators 这个 Map。
  4. 执行一系列 Preconditions.checkState,对用户的配置进行合法性校验,防止出现逻辑冲突。

总结

PartialUpdateMergeFunction 是 Paimon 实现高性能数据打宽(部分列更新)能力的技术基石。它通过一个设计精巧的合并流程,将简单的非空字段覆盖、基于序列号的有序更新、字段聚合以及多种删除策略融为一体。其 Factory 类则充当了连接用户配置和底层实现的桥梁。理解了这个类的工作原理,就能深刻地理解 Paimon partial-update 模式的强大之处。

双流拼接 怎么处理schema

Paimon 允许在写入数据时自动合并和演进表结构。这对于像双流 Join 结果写入等 schema 可能变化的场景至关重要。这个功能主要通过 write.merge-schema 选项来开启。

当将数据写入 Paimon 表时:

  1. 如果 write.merge-schema 设置为 true,Paimon 会比较写入数据(Source)的 schema 和目标表(Sink)当前的 schema。
  2. 如果发现写入数据中包含了表中不存在的新列,Paimon 会自动将这些新列添加到表结构中,生成一个新的、版本更高的 schema。
  3. 对于数据中缺失但在表 schema 中存在的列,Paimon 会自动填充 null 值。

这个过程是原子性的,并记录在表的元数据中。Paimon 会为每一次 schema 变更创建一个新的版本化的 schema 文件。

代码参考:

在 Spark 中,写入逻辑由 WriteIntoPaimonTable.scala 处理。可以看到,当 mergeSchema 为 true 时,它会调用 mergeAndCommitSchema 来合并 schema,并处理列不匹配的情况。

WriteIntoPaimonTable.scala

// ... existing code ...override def run(sparkSession: SparkSession): Seq[Row] = {var data = _dataif (mergeSchema) {val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)mergeAndCommitSchema(dataSchema, allowExplicitCast)// For case that some columns is absent in data, we still allow to write once write.merge-schema is true.val newTableSchema = SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {val resolve = sparkSession.sessionState.conf.resolverval cols = newTableSchema.map {field =>dataSchema.find(f => resolve(f.name, field.name)) match {case Some(f) => col(f.name)case _ => lit(null).as(field.name)}}data = data.select(cols: _*)}}
// ... existing code ...

一个具体的测试用例也展示了这一点,一个原先只有 a 和 b 列的表,成功写入了包含 c 和 d 列的新数据。

DataFrameWriteTest.scala

// ... existing code ...// Case 1: two additional fields: DoubleType and TimestampTypeval ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts)).toDF("a", "b", "c", "d")df2.write.format("paimon").mode("append").option("write.merge-schema", "true").save(location)
// ... existing code ...

在 Flink 或 Spark 中进行双流 Join 时,Paimon 通常作为 Sink 端。Join 操作本身由计算引擎完成。应用需要做的就是:

  1. 执行双流 Join。
  2. 将 Join 后的 DataStream 或 DataFrame 写入 Paimon 表。
  3. 在写入时,设置 write.merge-schema 为 true

这样,无论 Join 结果的 schema 如何(比如因为上游流增加了字段导致 Join 结果也增加了字段),Paimon 表都可以自动适应,动态地添加新列。

SchemaMergingUtils

SchemaMergingUtils 是 Paimon schema 演进(Schema Evolution)功能的核心工具类。它的主要职责是比较两个 schema(通常是数据表的现有 schema 和新写入数据的 schema),并根据预设的规则将它们合并成一个新的、统一的 schema。这个过程支持添加新列、安全地转换现有列的数据类型,从而实现动态 schema 的能力。

当配置 Paimon 表允许 schema 合并(例如通过 write.merge-schema=true)时,写入流程就会调用这个工具类。它会:

  1. 比较字段:找出新旧 schema 中同名和新增的字段。
  2. 合并类型:对于同名字段,尝试合并其数据类型(例如,INT 可以演进为 BIGINT)。
  3. 添加字段:将新 schema 中独有的字段添加到最终的 schema 中,并为其分配新的唯一 ID。
  4. 生成新版 Schema:如果发生了任何变更,它会创建一个版本号加一的新的 TableSchema 对象。

下面我们结合代码,从顶层方法到底层实现,一步步进行分析。

mergeSchemas

这是最顶层的入口方法,用于合并一个完整的表 schema 和一个新的行类型(通常来自要写入的数据)。

  • 参数:
    • currentTableSchema: Paimon 表当前的 TableSchema 对象。它包含了字段、分区键、主键、表配置等所有元数据。
    • targetType: 目标 RowType,即新数据的 schema。
    • allowExplicitCast: 一个布尔标志,决定是否允许显式(可能存在精度损失)的类型转换,比如 STRING 转 INT
  • 逻辑:
    1. 首先,它会检查 targetType 和 currentTableSchema 的 RowType 是否完全相同。如果相同,则无需合并,直接返回当前的 TableSchema
    2. 如果不同,它会初始化一个 AtomicInteger 类型的 highestFieldId,记录当前 schema 中所有字段(包括嵌套字段)的最大 ID。这个 ID 对于为新字段分配唯一标识至关重要。
    3. 调用重载的 mergeSchemas 方法(最终调用核心的 merge 方法)来递归地合并两个 RowType
    4. 如果合并后的 newRowType 与原始的 currentType 相同(例如,只是可空性变化,而合并逻辑会保留原始的可空性),则也认为没有发生实质性变化,返回原始的 TableSchema
    5. 如果 schema 确实发生了变化,它会创建一个新的 TableSchema 实例。这个新 schema 的 ID 会在旧 ID 的基础上加 1,字段列表和 highestFieldId 会更新,而分区键、主键、表配置和注释等信息则会从旧 schema 中继承。
 
// ... existing code ...public static TableSchema mergeSchemas(TableSchema currentTableSchema, RowType targetType, boolean allowExplicitCast) {RowType currentType = currentTableSchema.logicalRowType();if (currentType.equals(targetType)) {return currentTableSchema;}AtomicInteger highestFieldId = new AtomicInteger(currentTableSchema.highestFieldId());RowType newRowType =mergeSchemas(currentType, targetType, highestFieldId, allowExplicitCast);if (newRowType.equals(currentType)) {// It happens if the `targetType` only changes `nullability` but we always respect the// current's.return currentTableSchema;}return new TableSchema(currentTableSchema.id() + 1,newRowType.getFields(),highestFieldId.get(),currentTableSchema.partitionKeys(),currentTableSchema.primaryKeys(),currentTableSchema.options(),currentTableSchema.comment());}
// ... existing code ...

merge

这是所有合并逻辑的核心。它被递归调用以处理各种数据类型。

可空性处理 (Nullability Handling)

在方法的一开始,它将 base0 和 update0 的可空性都设置为 true 来进行比较。最终返回的类型的可空性将以 base0(原始表 schema 中的类型)为准。这意味着 schema 合并不会改变现有列的可空性。

// ... existing code ...public static DataType merge(DataType base0,DataType update0,AtomicInteger highestFieldId,boolean allowExplicitCast) {// Here we try to merge the base0 and update0 without regard to the nullability,// and set the base0's nullability to the return's.DataType base = base0.copy(true);DataType update = update0.copy(true);if (base.equals(update)) {return base0;} else if (base instanceof RowType && update instanceof RowType) {
// ... existing code ...

递归合并复杂类型

  • RowType (行类型): 这是最复杂的部分。
    1. 合并现有字段: 遍历 base (旧 schema) 的所有字段。对于每个字段,检查 update (新 schema) 中是否存在同名字段。如果存在,就递归调用 merge 方法来合并这两个字段的类型。如果不存在,则保留 base 中的原始字段。
    2. 添加新字段: 遍历 update 的所有字段,找出在 base 中不存在的字段。这些就是需要新增的列。对于每个新字段,调用 assignIdForNewField 为其分配一个新的、唯一的字段 ID,然后将其添加到最终的字段列表中。
    3. 最后,用更新后的字段列表创建一个新的 RowType
 
// ... existing code ...} else if (base instanceof RowType && update instanceof RowType) {List<DataField> baseFields = ((RowType) base).getFields();List<DataField> updateFields = ((RowType) update).getFields();Map<String, DataField> updateFieldMap =updateFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));List<DataField> updatedFields =baseFields.stream().map(baseField -> {if (updateFieldMap.containsKey(baseField.name())) {DataField updateField =updateFieldMap.get(baseField.name());DataType updatedDataType =merge(baseField.type(),updateField.type(),highestFieldId,allowExplicitCast);return new DataField(baseField.id(),baseField.name(),updatedDataType,baseField.description());} else {return baseField;}}).collect(Collectors.toList());Map<String, DataField> baseFieldMap =baseFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));List<DataField> newFields =updateFields.stream().filter(field -> !baseFieldMap.containsKey(field.name())).map(field -> assignIdForNewField(field, highestFieldId)).map(field -> field.copy(true)).collect(Collectors.toList());updatedFields.addAll(newFields);return new RowType(base0.isNullable(), updatedFields);} else if (base instanceof MapType && update instanceof MapType) {
// ... existing code ...
  • MapTypeArrayTypeMultisetType: 对于这些集合类型,合并逻辑很简单:递归地调用 merge 方法来合并它们的内部元素类型(MapType 的键和值类型,ArrayType 和 MultisetType 的元素类型)。

合并基础类型

  • DecimalType: 这是一个特例。只有当两个 DecimalType 的 scale (小数位数) 相同时,才能合并。合并后的 precision (总位数) 取两者中的最大值。如果 scale 不同,会直接抛出 UnsupportedOperationException

  • 其他可转换类型: 对于其他基础类型,通过 supportsDataTypesCast 方法判断是否可以转换。

    • 隐式转换 (Implicit Cast): 当 allowExplicitCast 为 false 时,只允许安全的类型提升,例如 INT -> BIGINTFLOAT -> DOUBLE
    • 显式转换 (Explicit Cast): 当 allowExplicitCast 为 true 时,允许更多可能损失精度的转换。
    • 对于带有长度(如 VARCHAR)或精度(如 TIMESTAMP)的类型,通常要求新类型的长度/精度不能小于旧类型,除非开启了显式转换。
    • 如果可以转换,则直接采用 update 的类型,但保留 base0 的可空性。
// ... existing code ...} else if (supportsDataTypesCast(base, update, allowExplicitCast)) {if (DataTypes.getLength(base).isPresent() && DataTypes.getLength(update).isPresent()) {// this will check and merge types which has a `length` attribute, like BinaryType,// CharType, VarBinaryType, VarCharType.if (allowExplicitCast|| DataTypes.getLength(base).getAsInt()<= DataTypes.getLength(update).getAsInt()) {return update.copy(base0.isNullable());} else {throw new UnsupportedOperationException(String.format("Failed to merge the target type that has a smaller length: %s and %s",base, update));}} else if (DataTypes.getPrecision(base).isPresent()&& DataTypes.getPrecision(update).isPresent()) {// this will check and merge types which has a `precision` attribute, like// LocalZonedTimestampType, TimeType, TimestampType.if (allowExplicitCast|| DataTypes.getPrecision(base).getAsInt()<= DataTypes.getPrecision(update).getAsInt()) {return update.copy(base0.isNullable());} else {throw new UnsupportedOperationException(String.format("Failed to merge the target type that has a lower precision: %s and %s",base, update));}} else {return update.copy(base0.isNullable());}} else {throw new UnsupportedOperationException(String.format("Failed to merge data types %s and %s", base, update));}}
// ... existing code ...

assignIdForNewField

这个方法非常重要。当向 RowType 中添加一个新字段时,它负责为这个新字段及其所有嵌套字段(如果是复杂类型)分配唯一的 ID。它通过传入的 AtomicInteger highestFieldId 来实现 ID 的原子性递增,确保了在并发场景下 ID 的唯一性,这对于 Paimon 正确地按 ID 映射和读取列数据至关重要。

// ... existing code ...private static DataField assignIdForNewField(DataField field, AtomicInteger highestFieldId) {DataType dataType = ReassignFieldId.reassign(field.type(), highestFieldId);return new DataField(highestFieldId.incrementAndGet(), field.name(), dataType, field.description());}
}

总结

SchemaMergingUtils 通过一套定义明确且可递归的规则,实现了 Paimon 强大而灵活的 Schema 演进能力。它能够智能地处理字段的增加和类型变化,同时通过严格的 ID 分配和管理,保证了数据读写的正确性。这个类是 Paimon 能够适应动态数据源、支持平滑表结构变更的关键所在。

SchemaManager

SchemaManager 是 Paimon 中负责管理表 schema(模式)的核心组件。它处理所有与 schema 相关的持久化操作,包括创建、读取、更新和版本管理。可以把它看作是 Paimon 表 schema 在文件系统中的“数据库管理员”。

SchemaManager 的主要职责可以归纳为以下几点:

  1. Schema 持久化:将 TableSchema 对象序列化为 JSON 文件,并存储在表的 schema 目录下。每个 schema 文件代表一个版本。
  2. 版本管理:每个 schema 文件名都以 schema- 开头,后跟一个从 0 开始递增的版本号(ID),例如 schema-0schema-1 等。这使得 Paimon 可以追踪 schema 的所有历史变更。
  3. Schema 读取:提供方法来读取最新版本的 schema、特定版本的 schema 或所有版本的 schema。
  4. Schema 创建:在创建新表时,负责初始化并提交第一个 schema 版本(schema-0)。
  5. Schema 变更:通过应用一系列 SchemaChange(如添加列、删除列、修改表选项等)来原子性地更新 schema,并生成一个新的、版本号加一的 schema 文件。
  6. 多分支支持:能够为不同的数据分支(branch)管理各自独立的 schema 演进路径。

结构和关键属性

// ... existing code ...
@ThreadSafe
public class SchemaManager implements Serializable {private static final String SCHEMA_PREFIX = "schema-";private final FileIO fileIO;private final Path tableRoot;private final String branch;public SchemaManager(FileIO fileIO, Path tableRoot) {
// ... existing code ...
  • @ThreadSafe: 这个注解表明该类的设计是线程安全的,允许多个线程同时访问一个 SchemaManager 实例。
  • SCHEMA_PREFIX: 常量 "schema-",定义了 schema 文件名的前缀。
  • fileIOFileIO 接口的实例,用于与底层文件系统(如 HDFS, S3, 本地文件系统)进行交互。
  • tableRootPath 对象,指向表的根目录。SchemaManager 会在这个目录下的 schema 子目录中工作。
  • branch: 字符串,表示当前 SchemaManager 实例操作的数据分支名称。Paimon 支持类似 Git 的分支功能,main 是默认的主分支。不同的分支可以有独立的快照和 schema 演进。

构造函数和分支管理

 
// ... existing code ...public SchemaManager(FileIO fileIO, Path tableRoot) {this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH);}/** Specify the default branch for data writing. */public SchemaManager(FileIO fileIO, Path tableRoot, String branch) {this.fileIO = fileIO;this.tableRoot = tableRoot;this.branch = BranchManager.normalizeBranch(branch);}public SchemaManager copyWithBranch(String branchName) {return new SchemaManager(fileIO, tableRoot, branchName);}
// ... existing code ...
  • 构造函数初始化了 fileIOtableRoot 和 branch。默认使用主分支 DEFAULT_MAIN_BRANCH
  • copyWithBranch(String branchName): 这是一个工厂方法,用于创建一个新的 SchemaManager 实例来操作指定的分支。这体现了 Paimon 对多分支的支持。

Schema 读取方法

 
// ... existing code ...public Optional<TableSchema> latest() {try {return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX).reduce(Math::max).map(this::schema);} catch (IOException e) {throw new UncheckedIOException(e);}}
// ... existing code ...public List<TableSchema> listAll() {return listAllIds().stream().map(this::schema).collect(Collectors.toList());}public List<Long> listAllIds() {try {return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX).collect(Collectors.toList());} catch (IOException e) {throw new UncheckedIOException(e);}}
// ... existing code ...
  • latest(): 获取最新版本的 TableSchema。它通过 listVersionedFiles 工具方法列出 schema 目录下所有符合 schema-* 格式的文件,提取出版本号,找到最大的版本号,然后调用 schema(long id) 方法读取并反序列化对应的 schema 文件。
  • listAll(): 获取所有版本的 TableSchema 列表。
  • listAllIds(): 仅获取所有 schema 版本的 ID 列表。
  • schema(long id) (未在片段中完全展示,但被 latest() 调用): 这是一个内部方法,根据给定的 ID 构建 schema 文件路径(如 .../schema/schema-5),然后使用 fileIO 读取文件内容,并通过 TableSchema.fromJSON(String json) 将其反序列化为 TableSchema 对象。

表创建 createTable(...)

// ... existing code ...public TableSchema createTable(Schema schema, boolean externalTable) throws Exception {while (true) {Optional<TableSchema> latest = latest();if (latest.isPresent()) {TableSchema latestSchema = latest.get();if (externalTable) {checkSchemaForExternalTable(latestSchema.toSchema(), schema);return latestSchema;} else {throw new IllegalStateException("Schema in filesystem exists, creation is not allowed.");}}TableSchema newSchema = TableSchema.create(0, schema);// validate table from creating tableFileStoreTableFactory.create(fileIO, tableRoot, newSchema).store();boolean success = commit(newSchema);if (success) {return newSchema;}}}
// ... existing code ...
  • 这是一个原子性操作,通过 while(true) 循环和文件系统的原子性创建来保证。
  • 检查存在性: 首先调用 latest() 检查是否已有 schema 文件存在。如果存在且不是创建外部表,则抛出异常,防止覆盖现有表。
  • 创建新 Schema: 如果不存在,则使用 TableSchema.create(0, schema) 创建一个 ID 为 0 的新 TableSchema
  • 验证: 调用 FileStoreTableFactory.create(...) 来验证 schema 的有效性(例如,检查主键、分区键等配置是否合法)。
  • 提交: 调用 commit(newSchema) 方法,该方法会尝试原子性地创建 schema-0 文件。如果创建成功,循环结束并返回新的 TableSchema。如果因为并发冲突导致创建失败,循环会继续,重新尝试整个过程。

Schema 变更 commitChanges(...)

这是执行 ALTER TABLE 操作的核心逻辑。

// ... existing code ...public TableSchema commitChanges(List<SchemaChange> changes)throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,Catalog.ColumnNotExistException {SnapshotManager snapshotManager =new SnapshotManager(fileIO, tableRoot, branch, null, null);LazyField<Boolean> hasSnapshots =new LazyField<>(() -> snapshotManager.latestSnapshot() != null);while (true) {TableSchema oldTableSchema =latest().orElseThrow(() ->new Catalog.TableNotExistException(identifierFromPath(tableRoot.toString(), true, branch)));TableSchema newTableSchema = generateTableSchema(oldTableSchema, changes, hasSnapshots);try {boolean success = commit(newTableSchema);if (success) {return newTableSchema;}} catch (Exception e) {throw new RuntimeException(e);}}}public boolean commit(TableSchema newSchema) throws Exception {SchemaValidation.validateTableSchema(newSchema);SchemaValidation.validateFallbackBranch(this, newSchema);Path schemaPath = toSchemaPath(newSchema.id());return fileIO.tryToWriteAtomic(schemaPath, newSchema.toString());}
// ... existing code ...
  • 同样使用 while(true) 循环来保证原子性。
  • 获取旧 Schema: 首先获取当前的最新 schema (oldTableSchema)。
  • 生成新 Schema: 调用 generateTableSchema 方法,该方法是变更逻辑的核心。它接收旧 schema 和一个 SchemaChange 列表,然后逐个应用这些变更(如 AddColumnDropColumnSetOption 等),生成一个新的 TableSchema 对象。这个新对象的 ID 是旧 ID 加 1。
  • 提交新 Schema: 调用 commit(newTableSchema) 尝试原子性地创建新的 schema 文件(如 schema-5 -> schema-6)。如果成功,则返回新 schema。如果失败,则重试。

generateTableSchema(...)

这个方法是应用 SchemaChange 的具体实现。它像一个状态机,基于 oldTableSchema,根据 changes 列表中的每个变更项,逐步构建出 newTableSchema 的各个部分。

// ... existing code ...public TableSchema generateTableSchema(TableSchema oldTableSchema, List<SchemaChange> changes, LazyField<Boolean> hasSnapshots)throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {Map<String, String> oldOptions = new HashMap<>(oldTableSchema.options());Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());String newComment = oldTableSchema.comment();for (SchemaChange change : changes) {if (change instanceof SetOption) {
// ... existing code ...} else if (change instanceof RemoveOption) {
// ... existing code ...} else if (change instanceof AddColumn) {
// ... existing code ...} else if (change instanceof RenameColumn) {
// ... existing code ...} else if (change instanceof DropColumn) {
// ... existing code ...} else if (change instanceof UpdateColumnType) {
// ... existing code ...} else if (change instanceof UpdateColumnNullability) {
// ... existing code ...} else if (change instanceof UpdateColumnPosition) {
// ... existing code ...} else if (change instanceof UpdateColumnComment) {
// ... existing code ...}}
// ... existing code ...

它通过 instanceof 判断 SchemaChange 的具体类型,并执行相应的逻辑:

  • SetOption/RemoveOption: 修改 newOptions 这个 Map。
  • AddColumn: 向 newFields 列表中添加新字段,并使用 highestFieldId 分配新 ID。
  • RenameColumn: 修改 newFields 中某个字段的名称。
  • DropColumn: 从 newFields 中移除字段。
  • UpdateColumnType/UpdateColumnNullability: 更新字段的类型或可空性。
  • ...等等。

总结

SchemaManager 是 Paimon 表结构管理的基石。它通过将 schema 版本化并持久化到文件系统中,实现了 schema 的可靠追踪和演进。其原子性的提交操作(无论是创建还是变更)确保了在并发环境下的元数据一致性。它与 SchemaMergingUtils(负责逻辑合并)和 SchemaChange(负责定义变更操作)等类紧密协作,共同构成了 Paimon 强大而灵活的 Schema Evolution 机制。

http://www.lryc.cn/news/597292.html

相关文章:

  • 【硬件-笔试面试题】硬件/电子工程师,笔试面试题-2,(电路分析/MOS管)
  • OpenLayers 快速入门(四)View 对象
  • PyTorch中nn.Module详解和综合代码示例
  • 大模型提示词漏洞攻防实战:从注入攻击到智能免疫系统的进化之路
  • mac电脑搭载c、c++环境(基于vs code)
  • 在mac 上zsh 如何安装最新的 go 工具
  • GRE实验
  • 微软Fabric重塑数据管理:Forrester报告揭示高ROI
  • 「iOS」——KVC
  • linxu CentOS 配置nginx
  • 【音视频学习】四、深入解析视频技术中的YUV数据存储方式:从原理到实践
  • 开源UI生态掘金:从Ant Design二次开发到行业专属组件的技术变现
  • 7月23日华为机考真题第二题-200分
  • 7月23日华为机考真题第一题100分
  • 关于原车一键启动升级手机控车的核心信息及注意事项
  • 将AI协作编程从“碰运气”的提示工程(Prompt Engineering)提升到“可预期”的上下文工程(Context Engineering)
  • 驯服AI的“魔法咒语”:Prompt提示词工程使用教程
  • [特殊字符] 从数据库无法访问到成功修复崩溃表:一次 MySQL 故障排查实录
  • 显微科研中的关键选择:不同显微镜相机技术特性与应用适配性全面解析
  • SpringBoot Stream实战指南
  • Django学习之旅--第13课:Django模型关系进阶与查询优化实战
  • 电科金仓推出AI融合数据库,开启国产数据库新时代
  • 深入理解 Java Builder 设计模式:解决构造函数爆炸问题
  • Java SE:类与对象的认识
  • 编程语言Java——核心技术篇(二)类的高级特性
  • Python 程序设计讲义(9):Python 的基本数据类型——复数
  • LeetCode|Day23|326. 3 的幂|Python刷题笔记
  • Flask框架全面详解
  • Element中ElMessageBox弹框内容及按钮样式自定义
  • 服务器版本信息泄露-iis返回包暴露服务器版本信息