当前位置: 首页 > news >正文

Apache SeaTunnel Spark引擎执行流程源码分析

目录

1. 任务启动入口

2. 任务执行命令类:SparkTaskExecuteCommand

3. SparkExecution的创建与初始化

3.1 核心组件初始化

3.2 关键对象说明

4. 任务执行:SparkExecution.execute()

5. Source处理流程

5.1 插件初始化

5.2 数据流生成

6. Transform处理流程

6.1 插件初始化

6.2 转换执行

7. Sink处理流程

7.1 插件初始化

7.2 数据输出

执行流程全景图

关键设计总结


本文基于SeaTunnel 2.3.x源码分析Spark引擎执行流程,以seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java为入口,完整解析Spark引擎的执行流程。


1. 任务启动入口

启动类核心代码:

public static void main(String[] args) {// 1. 创建Spark命令参数对象SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();// 2. 执行SeaTunnel.run()回调Spark执行命令SeaTunnel.run(sparkCommandArgs.buildCommand());
}
  • buildCommand()返回SparkTaskExecuteCommand实例

  • SeaTunnel.run()最终调用SparkTaskExecuteCommand.execute()


2. 任务执行命令类:SparkTaskExecuteCommand

核心执行流程:

public void execute() {// 1. 解析配置文件生成Config对象Config config = ConfigBuilder.of(configFile);// 2. 创建SparkExecution实例SparkExecution seaTunnelTaskExecution = new SparkExecution(config);// 3. 执行任务seaTunnelTaskExecution.execute();
}

3. SparkExecution的创建与初始化
3.1 核心组件初始化
public SparkExecution(Config config) {// 创建Spark运行时环境this.sparkRuntimeEnvironment = SparkRuntimeEnvironment.getInstance(config);JobContext jobContext = new JobContext();jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));// 创建三大处理器this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkRuntimeEnvironment, jobContext, config.getConfigList(Constants.SOURCE));this.transformPluginExecuteProcessor = new TransformExecuteProcessor(sparkRuntimeEnvironment, jobContext,TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()));this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(sparkRuntimeEnvironment, jobContext, config.getConfigList(Constants.SINK));
}
3.2 关键对象说明
组件类型功能
sourcePluginExecuteProcessorSourceExecuteProcessor处理数据源接入
transformPluginExecuteProcessorTransformExecuteProcessor处理数据转换逻辑
sinkPluginExecuteProcessorSinkExecuteProcessor处理数据输出
sparkRuntimeEnvironmentSparkRuntimeEnvironment封装SparkSession及运行时环境

4. 任务执行:SparkExecution.execute()

DAG构建流程:

public void execute() throws TaskExecuteException {// 初始化数据集集合List<Dataset<Row>> datasets = new ArrayList<>();// 按顺序执行三大组件datasets = sourcePluginExecuteProcessor.execute(datasets);datasets = transformPluginExecuteProcessor.execute(datasets);sinkPluginExecuteProcessor.execute(datasets);log.info("Spark Execution started");
}

5. Source处理流程

5.1 插件初始化

调用链:

SourceExecuteProcessor()→ super(sparkRuntimeEnvironment, jobContext, sourceConfigs) // 调用父类构造器→ this.plugins = initializePlugins(pluginConfigs)

插件加载核心逻辑:

protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(List<? extends Config> pluginConfigs) {SeaTunnelSourcePluginDiscovery discovery = new SeaTunnelSourcePluginDiscovery();List<SeaTunnelSource<?, ?, ?>> sources = new ArrayList<>();Set<URL> jars = new HashSet<>();for (Config sourceConfig : pluginConfigs) {// 1. 识别插件类型PluginIdentifier identifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));// 2. 加载依赖JARjars.addAll(discovery.getPluginJarPaths(Lists.newArrayList(identifier)));// 3. 创建插件实例SeaTunnelSource<?, ?, ?> source = discovery.createPluginInstance(identifier);// 4. 初始化插件source.prepare(sourceConfig);source.setJobContext(jobContext);sources.add(source);}// 5. 注册插件JAR到Spark环境sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));return sources;
}
5.2 数据流生成

执行入口:

public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {List<Dataset<Row>> sources = new ArrayList<>();for (int i = 0; i < plugins.size(); i++) {SeaTunnelSource<?, ?, ?> source = plugins.get(i);Config pluginConfig = pluginConfigs.get(i);// 1. 确定并行度int parallelism = pluginConfig.hasPath(CommonOptions.PARALLELISM.key())? pluginConfig.getInt(CommonOptions.PARALLELISM.key()): sparkRuntimeEnvironment.getSparkConf().getInt(CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue());// 2. 创建Dataset(核心步骤)Dataset<Row> dataset = sparkRuntimeEnvironment.getSparkSession().read().format(SeaTunnelSource.class.getSimpleName()) // 使用SeaTunnelSource标识.option(CommonOptions.PARALLELISM.key(), parallelism) // 设置并行度.option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source)) // 序列化插件实例.schema((StructType) TypeConverterUtils.convert(source.getProducedType())) // 设置Schema.load();// 3. 注册临时视图registerInputTempView(pluginConfig, dataset);sources.add(dataset);}return sources;
}

临时视图注册逻辑:

void registerInputTempView(Config config, Dataset<Row> dataset) {if (config.hasPath(RESULT_TABLE_NAME)) {String tableName = config.getString(RESULT_TABLE_NAME);// 创建Spark临时视图dataset.createOrReplaceTempView(tableName);}
}

6. Transform处理流程

6.1 插件初始化

关键校验逻辑(在具体Transform实现中):

public void prepare(Config pluginConfig) {// 必须包含source_table_name和result_table_nameif (!pluginConfig.hasPath(SOURCE_TABLE_NAME) || !pluginConfig.hasPath(RESULT_TABLE_NAME)) {throw new IllegalArgumentException("Missing required table name config");}// 输入输出表名不能相同if (Objects.equals(pluginConfig.getString(SOURCE_TABLE_NAME),pluginConfig.getString(RESULT_TABLE_NAME))) {throw new IllegalArgumentException("Source and result table names must be different");}// 调用具体Transform的配置初始化setConfig(pluginConfig);
}
6.2 转换执行

核心处理流程:

public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {Dataset<Row> input = upstreamDataStreams.get(0); // 默认使用第一个上游流List<Dataset<Row>> result = new ArrayList<>();for (int i = 0; i < plugins.size(); i++) {SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);Config pluginConfig = pluginConfigs.get(i);// 1. 获取输入流(通过source_table_name查找)Dataset<Row> stream = fromSourceTable(pluginConfig, sparkRuntimeEnvironment).orElse(input);// 2. 执行转换input = sparkTransform(transform, stream);// 3. 注册结果表registerInputTempView(pluginConfig, input);result.add(input);}return result;
}

转换算子实现:

private Dataset<Row> sparkTransform(SeaTunnelTransform transform, Dataset<Row> stream) {// 1. 类型系统转换SeaTunnelDataType<?> inputType = TypeConverterUtils.convert(stream.schema());transform.setTypeInfo(inputType);// 2. 创建输出SchemaStructType outputSchema = (StructType) TypeConverterUtils.convert(transform.getProducedType());// 3. 创建行转换器SeaTunnelRowConverter inputConverter = new SeaTunnelRowConverter(inputType);SeaTunnelRowConverter outputConverter = new SeaTunnelRowConverter(transform.getProducedType());// 4. 通过mapPartitions实现转换ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);return stream.mapPartitions((MapPartitionsFunction<Row, Row>) inputIterator -> new TransformIterator( // 自定义迭代器封装转换逻辑inputIterator, transform, outputSchema, inputConverter, outputConverter),encoder).filter(row -> row != null); // 过滤空值
}

自定义迭代器TransformIterator关键逻辑:

public class TransformIterator implements Iterator<Row> {@Overridepublic Row next() {Row row = input.next();// 1. 输入行转SeaTunnelRowSeaTunnelRow inRow = inputConverter.convert(row);// 2. 执行Transform核心逻辑SeaTunnelRow outRow = (SeaTunnelRow) transform.map(inRow);if (outRow != null) {// 3. 输出行转Spark Rowreturn outputConverter.convert(outRow);}return null;}
}

7. Sink处理流程

7.1 插件初始化

特殊处理逻辑:

protected List<SeaTunnelSink<?, ?, ?, ?>> initializePlugins(...) {for (Config sinkConfig : pluginConfigs) {// ... 创建sink实例// 数据保存模式处理if (sink instanceof SupportDataSaveMode) {SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;saveModeSink.checkOptions(sinkConfig); // 校验配置}}
}
7.2 数据输出

执行流程:

public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {Dataset<Row> input = upstreamDataStreams.get(0);for (int i = 0; i < plugins.size(); i++) {Config sinkConfig = pluginConfigs.get(i);SeaTunnelSink<?, ?, ?, ?> sink = plugins.get(i);// 1. 获取输入流Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkRuntimeEnvironment).orElse(input);// 2. 设置类型信息sink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));// 3. 处理数据保存模式if (sink instanceof SupportDataSaveMode) {SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;DataSaveMode saveMode = saveModeSink.getDataSaveMode();saveModeSink.handleSaveMode(saveMode);}// 4. 确定并行度int parallelism = sinkConfig.hasPath(CommonOptions.PARALLELISM.key())? sinkConfig.getInt(CommonOptions.PARALLELISM.key()): sparkRuntimeEnvironment.getSparkConf().getInt(CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue());// 5. 设置并行度(TODO:当前实现需优化)dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism);// 6. 注入Sink逻辑SparkSinkInjector.inject(dataset.write(), sink).option("checkpointLocation", "/tmp") // TODO:需改为可配置.mode(SaveMode.Append).save();}return null; // Sink是终点,无下游数据流
}

Sink输入流获取逻辑:

protected Optional<Dataset<Row>> fromSourceTable(Config pluginConfig, SparkRuntimeEnvironment env) {if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) {return Optional.empty();}String sourceTableName = pluginConfig.getString(SOURCE_TABLE_NAME);// 从已注册的临时视图中获取数据集return Optional.of(env.getSparkSession().read().table(sourceTableName));
}

Sink注入器伪代码:

class SparkSinkInjector {static DataFrameWriter inject(DataFrameWriter writer, SeaTunnelSink sink) {// 通过反射或适配器模式将SeaTunnelSink逻辑注入到Spark的DataFrameWriterreturn writer.format(SeaTunnelSink.class.getSimpleName()).option(Constants.SINK_SERIALIZATION, SerializationUtils.objectToString(sink));}
}

执行流程全景图

 

关键设计总结

  1. 插件化架构

    • 通过SPI机制动态加载Source/Transform/Sink插件

    • 插件发现机制:SeaTunnel*PluginDiscovery

    • 依赖隔离:registerPlugin()管理插件JAR

  2. 表驱动DAG

    • source_table_nameresult_table_name构成执行链路

    • 通过createOrReplaceTempView()实现表注册

    • 使用fromSourceTable()实现表查找

  3. 类型系统转换

    • TypeConverterUtils处理SeaTunnel类型与Spark类型的双向转换

    • SeaTunnelRowConverter实现行级别的数据转换

    • 自动Schema推导:source.getProducedType()

  4. 执行环境封装

    • SparkRuntimeEnvironment统一管理SparkSession

    • 支持批处理和流处理模式

    • 集中管理插件依赖和配置

  5. 分布式转换

    • Transform阶段使用mapPartitions实现并行处理

    • 自定义TransformIterator封装转换逻辑

    • 基于Spark的ExpressionEncoder处理类型安全

  6. Sink适配机制

    • 通过SparkSinkInjector桥接SeaTunnelSink和Spark写入体系

    • 支持不同的保存模式(Append/Overwrite等)

    • 自动处理并行度和检查点配置

本文完整解析了SeaTunnel Spark引擎从任务启动到DAG构建的全流程,重点突出了插件加载机制、表驱动DAG设计和类型转换系统等核心设计,为深入理解SeaTunnel内部工作原理提供了详细参考。

http://www.lryc.cn/news/574036.html

相关文章:

  • 17、Rocket MQ快速实战以及核⼼概念详解
  • 更新麒麟连不上外网
  • 从理论到实践:Air8101外挂Air780EPM模块,实现4G联网能力!
  • 游戏盾:守护虚拟世界的坚固堡垒
  • 「Linux用户账号管理」组群管理
  • ActixWeb框架实战案例精萃
  • DAY 40 训练和测试的规范写法
  • 详解HarmonyOS NEXT仓颉开发语言中的全局弹窗
  • LED-Merging: 无需训练的模型合并框架,兼顾LLM安全和性能!!
  • Spring AI 项目实战(十二):Spring Boot +AI + DeepSeek + 百度OCR 公司发票智能处理系统的技术实践(附完整源码)
  • Maven 多模块项目调试与问题排查总结
  • 2、结合STM32CubeMX学习FreeRTOS实时操作系统——任务
  • 半导体行业中的专用标准产品ASSP是什么?
  • 探秘Flink维表:从源码到运行时的深度解析
  • Java面试复习指南:并发编程、JVM、Spring框架、数据结构与算法、Java 8新特性
  • 人机融合智能 | 人智交互的神经人因学方法
  • 【ARM 嵌入式 编译系列 7.5 -- GCC 打印链接脚本各段使用信息】
  • Java面试复习:基础、并发、JVM及框架核心考点解析
  • AI辅助编程工具技术评估(2025年):CodeBuddy在开发者生态中的差异化优势分析
  • 【达梦数据库】忘记SYSDBA密码处理方法-已适配
  • 图像处理基础篇
  • 麒麟系统上设置Firefox自动化测试环境:指定Marionette端口号
  • 纯血HarmonyOS5 打造小游戏实践:扫雷(附源文件)
  • 电脑的虚拟内存对性能影响大吗
  • 深入理解JavaScript设计模式之迭代器模式
  • Docker部署prometheus+grafana+...
  • 【论文阅读35】-PINN review(2021)
  • 华为云 Flexus+DeepSeek 征文|增值税发票智能提取小工具:基于大模型的自动化信息解析实践
  • 虚拟 DOM 与 Diff 算法:现代前端框架的核心机制
  • [3-01-02].第15节:调优工具 - 查看 SQL 执行成本