Apache SeaTunnel Flink引擎执行流程源码分析
目录
1. 任务启动入口
2. 任务执行命令类:FlinkTaskExecuteCommand
3. FlinkExecution的创建与初始化
3.1 核心组件初始化
3.2 关键对象说明
4. 任务执行:FlinkExecution.execute()
5. Source处理流程
5.1 插件初始化
5.2 数据流生成
6. Transform处理流程
6.1 插件初始化
6.2 转换执行
7. Sink处理流程
7.1 插件初始化
7.2 数据输出
执行流程全景图
关键设计总结
本文基于SeaTunnel 2.3.x源码分析Flink引擎执行流程,以seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java
为入口,完整解析Flink引擎的执行流程。
1. 任务启动入口
启动类核心代码:
// 1. 初始化Flink启动命令参数 FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); // 2. 执行SeaTunnel.run()回调Flink执行命令 SeaTunnel.run(flinkCommandArgs.buildCommand());
-
buildCommand()
返回FlinkTaskExecuteCommand
实例 -
SeaTunnel.run()
最终调用FlinkTaskExecuteCommand.execute()
2. 任务执行命令类:FlinkTaskExecuteCommand
核心执行流程:
public void execute() { // 1. 解析配置文件生成Config对象 Config config = ConfigBuilder.of(configFile); // 2. 创建FlinkExecution实例 FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config); // 3. 执行任务 seaTunnelTaskExecution.execute(); }
3. FlinkExecution的创建与初始化
3.1 核心组件初始化
public FlinkExecution(Config config) { // 创建三大处理器 this.sourcePluginExecuteProcessor = new SourceExecuteProcessor( jarPaths, config.getConfigList(Constants.SOURCE), jobContext); this.transformPluginExecuteProcessor = new TransformExecuteProcessor( jarPaths, TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()), jobContext); this.sinkPluginExecuteProcessor = new SinkExecuteProcessor( jarPaths, config.getConfigList(Constants.SINK), jobContext); // 初始化Flink执行环境 this.flinkRuntimeEnvironment = FlinkRuntimeEnvironment.getInstance( this.registerPlugin(config, jarPaths)); // 为处理器注入运行时环境 this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); this.sinkPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); }
3.2 关键对象说明
组件 | 类型 | 功能 |
---|---|---|
sourcePluginExecuteProcessor | SourceExecuteProcessor | 处理数据源接入 |
transformPluginExecuteProcessor | TransformExecuteProcessor | 处理数据转换逻辑 |
sinkPluginExecuteProcessor | SinkExecuteProcessor | 处理数据输出 |
flinkRuntimeEnvironment | FlinkRuntimeEnvironment | 封装Flink StreamExecutionEnvironment |
4. 任务执行:FlinkExecution.execute()
DAG构建流程:
public void execute() { // 初始