Apache SeaTunnel Spark引擎执行流程源码分析
目录
1. 任务启动入口
2. 任务执行命令类:SparkTaskExecuteCommand
3. SparkExecution的创建与初始化
3.1 核心组件初始化
3.2 关键对象说明
4. 任务执行:SparkExecution.execute()
5. Source处理流程
5.1 插件初始化
5.2 数据流生成
6. Transform处理流程
6.1 插件初始化
6.2 转换执行
7. Sink处理流程
7.1 插件初始化
7.2 数据输出
执行流程全景图
关键设计总结
本文基于SeaTunnel 2.3.x源码分析Spark引擎执行流程,以seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
为入口,完整解析Spark引擎的执行流程。
1. 任务启动入口
启动类核心代码:
public static void main(String[] args) {// 1. 创建Spark命令参数对象SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();// 2. 执行SeaTunnel.run()回调Spark执行命令SeaTunnel.run(sparkCommandArgs.buildCommand()); }
-
buildCommand()
返回SparkTaskExecuteCommand
实例 -
SeaTunnel.run()
最终调用SparkTaskExecuteCommand.execute()
2. 任务执行命令类:SparkTaskExecuteCommand
核心执行流程:
public void execute() {// 1. 解析配置文件生成Config对象Config config = ConfigBuilder.of(configFile);// 2. 创建SparkExecution实例SparkExecution seaTunnelTaskExecution = new SparkExecution(config);// 3. 执行任务seaTunnelTaskExecution.execute(); }
3. SparkExecution的创建与初始化
3.1 核心组件初始化
public SparkExecution(Config config) {// 创建Spark运行时环境this.sparkRuntimeEnvironment = SparkRuntimeEnvironment.getInstance(config);JobContext jobContext = new JobContext();jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));// 创建三大处理器this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkRuntimeEnvironment, jobContext, config.getConfigList(Constants.SOURCE));this.transformPluginExecuteProcessor = new TransformExecuteProcessor(sparkRuntimeEnvironment, jobContext,TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()));this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(sparkRuntimeEnvironment, jobContext, config.getConfigList(Constants.SINK)); }
3.2 关键对象说明
组件 | 类型 | 功能 |
---|---|---|
sourcePluginExecuteProcessor | SourceExecuteProcessor | 处理数据源接入 |
transformPluginExecuteProcessor | TransformExecuteProcessor | 处理数据转换逻辑 |
sinkPluginExecuteProcessor | SinkExecuteProcessor | 处理数据输出 |
sparkRuntimeEnvironment | SparkRuntimeEnvironment | 封装SparkSession及运行时环境 |
4. 任务执行:SparkExecution.execute()
DAG构建流程:
public void execute() throws TaskExecuteException {// 初始化数据集集合List<Dataset<Row>> datasets = new ArrayList<>();// 按顺序执行三大组件datasets = sourcePluginExecuteProcessor.execute(datasets);datasets = transformPluginExecuteProcessor.execute(datasets);sinkPluginExecuteProcessor.execute(datasets);log.info("Spark Execution started"); }
5. Source处理流程
5.1 插件初始化
调用链:
SourceExecuteProcessor()→ super(sparkRuntimeEnvironment, jobContext, sourceConfigs) // 调用父类构造器→ this.plugins = initializePlugins(pluginConfigs)
插件加载核心逻辑:
protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(List<? extends Config> pluginConfigs) {SeaTunnelSourcePluginDiscovery discovery = new SeaTunnelSourcePluginDiscovery();List<SeaTunnelSource<?, ?, ?>> sources = new ArrayList<>();Set<URL> jars = new HashSet<>();for (Config sourceConfig : pluginConfigs) {// 1. 识别插件类型PluginIdentifier identifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));// 2. 加载依赖JARjars.addAll(discovery.getPluginJarPaths(Lists.newArrayList(identifier)));// 3. 创建插件实例SeaTunnelSource<?, ?, ?> source = discovery.createPluginInstance(identifier);// 4. 初始化插件source.prepare(sourceConfig);source.setJobContext(jobContext);sources.add(source);}// 5. 注册插件JAR到Spark环境sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));return sources; }
5.2 数据流生成
执行入口:
public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {List<Dataset<Row>> sources = new ArrayList<>();for (int i = 0; i < plugins.size(); i++) {SeaTunnelSource<?, ?, ?> source = plugins.get(i);Config pluginConfig = pluginConfigs.get(i);// 1. 确定并行度int parallelism = pluginConfig.hasPath(CommonOptions.PARALLELISM.key())? pluginConfig.getInt(CommonOptions.PARALLELISM.key()): sparkRuntimeEnvironment.getSparkConf().getInt(CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue());// 2. 创建Dataset(核心步骤)Dataset<Row> dataset = sparkRuntimeEnvironment.getSparkSession().read().format(SeaTunnelSource.class.getSimpleName()) // 使用SeaTunnelSource标识.option(CommonOptions.PARALLELISM.key(), parallelism) // 设置并行度.option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source)) // 序列化插件实例.schema((StructType) TypeConverterUtils.convert(source.getProducedType())) // 设置Schema.load();// 3. 注册临时视图registerInputTempView(pluginConfig, dataset);sources.add(dataset);}return sources; }
临时视图注册逻辑:
void registerInputTempView(Config config, Dataset<Row> dataset) {if (config.hasPath(RESULT_TABLE_NAME)) {String tableName = config.getString(RESULT_TABLE_NAME);// 创建Spark临时视图dataset.createOrReplaceTempView(tableName);} }
6. Transform处理流程
6.1 插件初始化
关键校验逻辑(在具体Transform实现中):
public void prepare(Config pluginConfig) {// 必须包含source_table_name和result_table_nameif (!pluginConfig.hasPath(SOURCE_TABLE_NAME) || !pluginConfig.hasPath(RESULT_TABLE_NAME)) {throw new IllegalArgumentException("Missing required table name config");}// 输入输出表名不能相同if (Objects.equals(pluginConfig.getString(SOURCE_TABLE_NAME),pluginConfig.getString(RESULT_TABLE_NAME))) {throw new IllegalArgumentException("Source and result table names must be different");}// 调用具体Transform的配置初始化setConfig(pluginConfig); }
6.2 转换执行
核心处理流程:
public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {Dataset<Row> input = upstreamDataStreams.get(0); // 默认使用第一个上游流List<Dataset<Row>> result = new ArrayList<>();for (int i = 0; i < plugins.size(); i++) {SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);Config pluginConfig = pluginConfigs.get(i);// 1. 获取输入流(通过source_table_name查找)Dataset<Row> stream = fromSourceTable(pluginConfig, sparkRuntimeEnvironment).orElse(input);// 2. 执行转换input = sparkTransform(transform, stream);// 3. 注册结果表registerInputTempView(pluginConfig, input);result.add(input);}return result; }
转换算子实现:
private Dataset<Row> sparkTransform(SeaTunnelTransform transform, Dataset<Row> stream) {// 1. 类型系统转换SeaTunnelDataType<?> inputType = TypeConverterUtils.convert(stream.schema());transform.setTypeInfo(inputType);// 2. 创建输出SchemaStructType outputSchema = (StructType) TypeConverterUtils.convert(transform.getProducedType());// 3. 创建行转换器SeaTunnelRowConverter inputConverter = new SeaTunnelRowConverter(inputType);SeaTunnelRowConverter outputConverter = new SeaTunnelRowConverter(transform.getProducedType());// 4. 通过mapPartitions实现转换ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);return stream.mapPartitions((MapPartitionsFunction<Row, Row>) inputIterator -> new TransformIterator( // 自定义迭代器封装转换逻辑inputIterator, transform, outputSchema, inputConverter, outputConverter),encoder).filter(row -> row != null); // 过滤空值 }
自定义迭代器TransformIterator关键逻辑:
public class TransformIterator implements Iterator<Row> {@Overridepublic Row next() {Row row = input.next();// 1. 输入行转SeaTunnelRowSeaTunnelRow inRow = inputConverter.convert(row);// 2. 执行Transform核心逻辑SeaTunnelRow outRow = (SeaTunnelRow) transform.map(inRow);if (outRow != null) {// 3. 输出行转Spark Rowreturn outputConverter.convert(outRow);}return null;} }
7. Sink处理流程
7.1 插件初始化
特殊处理逻辑:
protected List<SeaTunnelSink<?, ?, ?, ?>> initializePlugins(...) {for (Config sinkConfig : pluginConfigs) {// ... 创建sink实例// 数据保存模式处理if (sink instanceof SupportDataSaveMode) {SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;saveModeSink.checkOptions(sinkConfig); // 校验配置}} }
7.2 数据输出
执行流程:
public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {Dataset<Row> input = upstreamDataStreams.get(0);for (int i = 0; i < plugins.size(); i++) {Config sinkConfig = pluginConfigs.get(i);SeaTunnelSink<?, ?, ?, ?> sink = plugins.get(i);// 1. 获取输入流Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkRuntimeEnvironment).orElse(input);// 2. 设置类型信息sink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));// 3. 处理数据保存模式if (sink instanceof SupportDataSaveMode) {SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;DataSaveMode saveMode = saveModeSink.getDataSaveMode();saveModeSink.handleSaveMode(saveMode);}// 4. 确定并行度int parallelism = sinkConfig.hasPath(CommonOptions.PARALLELISM.key())? sinkConfig.getInt(CommonOptions.PARALLELISM.key()): sparkRuntimeEnvironment.getSparkConf().getInt(CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue());// 5. 设置并行度(TODO:当前实现需优化)dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism);// 6. 注入Sink逻辑SparkSinkInjector.inject(dataset.write(), sink).option("checkpointLocation", "/tmp") // TODO:需改为可配置.mode(SaveMode.Append).save();}return null; // Sink是终点,无下游数据流 }
Sink输入流获取逻辑:
protected Optional<Dataset<Row>> fromSourceTable(Config pluginConfig, SparkRuntimeEnvironment env) {if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) {return Optional.empty();}String sourceTableName = pluginConfig.getString(SOURCE_TABLE_NAME);// 从已注册的临时视图中获取数据集return Optional.of(env.getSparkSession().read().table(sourceTableName)); }
Sink注入器伪代码:
class SparkSinkInjector {static DataFrameWriter inject(DataFrameWriter writer, SeaTunnelSink sink) {// 通过反射或适配器模式将SeaTunnelSink逻辑注入到Spark的DataFrameWriterreturn writer.format(SeaTunnelSink.class.getSimpleName()).option(Constants.SINK_SERIALIZATION, SerializationUtils.objectToString(sink));} }
执行流程全景图
关键设计总结
-
插件化架构:
-
通过SPI机制动态加载Source/Transform/Sink插件
-
插件发现机制:
SeaTunnel*PluginDiscovery
-
依赖隔离:
registerPlugin()
管理插件JAR
-
-
表驱动DAG:
-
source_table_name
和result_table_name
构成执行链路 -
通过
createOrReplaceTempView()
实现表注册 -
使用
fromSourceTable()
实现表查找
-
-
类型系统转换:
-
TypeConverterUtils
处理SeaTunnel类型与Spark类型的双向转换 -
SeaTunnelRowConverter
实现行级别的数据转换 -
自动Schema推导:
source.getProducedType()
-
-
执行环境封装:
-
SparkRuntimeEnvironment
统一管理SparkSession -
支持批处理和流处理模式
-
集中管理插件依赖和配置
-
-
分布式转换:
-
Transform阶段使用
mapPartitions
实现并行处理 -
自定义
TransformIterator
封装转换逻辑 -
基于Spark的ExpressionEncoder处理类型安全
-
-
Sink适配机制:
-
通过
SparkSinkInjector
桥接SeaTunnelSink和Spark写入体系 -
支持不同的保存模式(Append/Overwrite等)
-
自动处理并行度和检查点配置
-
本文完整解析了SeaTunnel Spark引擎从任务启动到DAG构建的全流程,重点突出了插件加载机制、表驱动DAG设计和类型转换系统等核心设计,为深入理解SeaTunnel内部工作原理提供了详细参考。