Paimon Consumer机制解析
Consumer
Consumer
在 Paimon 中扮演着至关重要的角色,它是实现流式读取、断点续传和安全消费(防止快照过早过期)的核心机制。
首先,我们来看Consumer.java
文件。
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.consumer;import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.JsonSerdeUtil;import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException;import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;/** Consumer which contains next snapshot. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Consumer {private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";private final long nextSnapshot;@JsonCreatorpublic Consumer(@JsonProperty(FIELD_NEXT_SNAPSHOT) long nextSnapshot) {this.nextSnapshot = nextSnapshot;}@JsonGetter(FIELD_NEXT_SNAPSHOT)public long nextSnapshot() {return nextSnapshot;}public String toJson() {return JsonSerdeUtil.toJson(this);}public static Consumer fromJson(String json) {return JsonSerdeUtil.fromJson(json, Consumer.class);}public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {int retryNumber = 0;MismatchedInputException exception = null;while (retryNumber++ < 10) {try {return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);} catch (MismatchedInputException e) {// retryexception = e;try {Thread.sleep(1_000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();throw new RuntimeException(ie);}} catch (IOException e) {throw new UncheckedIOException(e);}}throw new UncheckedIOException(exception);}
}
从代码结构来看,Consumer
是一个非常简单的 POJO (Plain Old Java Object),其核心职责可以概括为:
- 状态载体:它只包含一个核心字段
nextSnapshot
,用于记录一个消费者(由consumer-id
标识)下一次应该从哪个快照(Snapshot)开始消费。 - 序列化/反序列化:它提供了
toJson()
和fromJson()
方法,利用 Jackson 库将自身对象与 JSON 字符串进行转换。这表明消费者的状态是被持久化为 JSON 文件的。 - 原子化读写:
fromPath()
方法是关键。它负责从文件系统中读取并解析一个Consumer
对象。- 它使用了
fileIO.readOverwrittenFileUtf8(path)
,这暗示了消费者状态文件是可被原子性覆盖更新的。 - 它包含了一个重试机制。当捕获到
MismatchedInputException
时,会进行最多10次、每次间隔1秒的重试。这个异常通常发生在读写并发的场景下(例如,一个 Flink 作业正在更新 consumer 文件,而另一个进程(如expire
任务)恰好在读取这个文件),文件可能不完整。这个重试机制保证了在分布式环境下的读取鲁棒性。
- 它使用了
Consumer
的管理与使用:ConsumerManager
Consumer
类本身只是一个数据模型,其生命周期的管理由 ConsumerManager
类负责。
// ... existing code ...
public class ConsumerManager implements Serializable {// ... existing code ...private final FileIO fileIO;private final Path tablePath;private final String branch;private static final String CONSUMER_PREFIX = "consumer-";// ... existing code ...public ConsumerManager(FileIO fileIO, Path tablePath, String branchName) {this.fileIO = fileIO;this.tablePath = tablePath;this.branch =StringUtils.isNullOrWhitespaceOnly(branchName) ? DEFAULT_MAIN_BRANCH : branchName;}public Optional<Consumer> consumer(String consumerId) {return Consumer.fromPath(fileIO, consumerPath(consumerId));}public void resetConsumer(String consumerId, Consumer consumer) {try {fileIO.overwriteFileUtf8(consumerPath(consumerId), consumer.toJson());} catch (IOException e) {throw new UncheckedIOException(e);}}public void deleteConsumer(String consumerId) {fileIO.deleteQuietly(consumerPath(consumerId));}// ... existing code ...public Map<String, Long> consumers() throws IOException {Map<String, Long> consumers = new HashMap<>();listOriginalVersionedFiles(fileIO, consumerDirectory(), CONSUMER_PREFIX).forEach(id -> {Optional<Consumer> consumer = this.consumer(id);consumer.ifPresent(value -> consumers.put(id, value.nextSnapshot()));});return consumers;}
// ... existing code ...private Path consumerPath(String consumerId) {return new Path(branchPath(tablePath, branch) + "/consumer/" + CONSUMER_PREFIX + consumerId);}
}
ConsumerManager
提供了对 Consumer
的完整 CRUD 操作:
- 创建/更新 (
resetConsumer
): 将一个Consumer
对象序列化为 JSON,并写入到指定的文件中。文件名格式为consumer-<consumerId>
,存储在表路径下的consumer
目录中。 - 读取 (
consumer
): 根据consumerId
读取并解析对应的Consumer
对象。 - 删除 (
deleteConsumer
): 删除指定的consumer
文件。 - 列举 (
consumers
): 扫描consumer
目录,列出所有存在的消费者及其nextSnapshot
ID。
Consumer
的核心价值与应用场景
结合文档和测试用例,我们可以看到 Consumer
机制的两个核心价值:
a. 安全消费 (Safe Consumption)
Paimon 会定期清理旧的、不再需要的快照以节省存储空间。但如果一个流作业正在消费某个旧快照的数据,这个快照就不能被删除。Consumer
机制解决了这个问题。
文档摘录 (
docs/content/flink/consumer-id.md
):
- Safe consumption: When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system, and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration.
Paimon 的快照清理逻辑会通过 ConsumerManager
获取所有 Consumer
的 nextSnapshot
,并找到其中的最小值。任何比这个最小值还旧的快照都可以被安全地删除,而大于等于这个最小值的快照则会被保留,从而保证了没有任何消费者会因为快照被清理而丢失数据。
b. 断点续传 (Resume from Breakpoint)
这是流式处理中非常重要的容错能力。当一个流作业失败重启后,它需要知道上次消费到了哪里。
文档摘录 (
docs/content/flink/consumer-id.md
): 2. Resume from breakpoint: When previous job is stopped, the newly started job can continue to consume from the previous progress without resuming from the state.
当你在 Flink 或 Spark 中使用 Paimon a作为 Source 并指定 consumer-id
时:
- 作业运行时,会定期将消费进度(即下一个要消费的快照ID)通过
ConsumerManager
更新到对应的consumer-<consumer-id>
文件中。 - 当作业重启时,它会首先读取这个文件,获取
nextSnapshot
,然后从这个快照开始继续消费,实现了无缝的断点续传。
例如,在 Flink SQL 中这样使用:
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.mode' = 'at-least-once') */;
用户交互与可见性
Paimon 提供了多种方式让用户可以查看和管理 Consumer
。
a. 系统表 $consumers
你可以像查询普通表一样查询一个表的消费者信息。
文档摘录 (
docs/content/concepts/system-tables.md
):SELECT * FROM my_table$consumers;/* +-------------+------------------+ | consumer_id | next_snapshot_id | +-------------+------------------+ | id1 | 1 | | id2 | 3 | +-------------+------------------+ 2 rows in set */
其底层实现 ConsumersTable.java
正是调用了 ConsumerManager.consumers()
方法来获取数据。
ConsumersTable.java
// ... existing code ...public RecordReader<InternalRow> createReader(Split split) throws IOException {
// ... existing code ...Path location = ((ConsumersTable.ConsumersSplit) split).location;Map<String, Long> consumers = new ConsumerManager(fileIO, location, branch).consumers();Iterator<InternalRow> rows =Iterators.transform(consumers.entrySet().iterator(), this::toRow);
// ... existing code ...}
// ... existing code ...
b. Action 与 Procedure
当需要手动干预消费位点时(比如修复数据问题、重跑任务),可以使用 Paimon 提供的 Action 或 Procedure。
reset_consumer
: 重置或删除一个指定的 consumer。clear_consumers
: 批量清理符合条件的 consumer。
文档摘录 (
docs/content/spark/procedures.md
):-- reset the new next snapshot id in the consumer CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId => 10)-- delete consumer CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')-- clear all consumers in the table CALL sys.clear_consumers(table => 'default.T')
这些操作的底层最终都会调用 ConsumerManager
的 resetConsumer
或 deleteConsumer
等方法。
总结
Consumer
类虽然代码简单,但它是 Paimon 流处理生态的基石之一。我们可以将其理解为一个分布式、持久化的消费位点标记。
- 物理表现:一个存储在表目录下的、名为
consumer-<id>
的 JSON 文件。 - 核心内容:记录了
nextSnapshot
,即下一次要消费的快照 ID。 - 管理接口:通过
ConsumerManager
进行统一的增删改查。 - 核心功能:支撑了流式消费的断点续传和 Paimon 表的快照安全保留两大关键特性。
- 用户可见性:通过
$consumers
系统表和reset/clear
等 Procedure/Action 暴露给用户,提供了强大的可观测性和可管理性。