当前位置: 首页 > news >正文

Flink SQL执行流程深度剖析:从SQL语句到分布式执行

在大数据处理领域,Flink SQL凭借其强大的处理能力和易用性,成为众多开发者的选择。与其他OLAP引擎类似,Flink SQL的SQL执行流程大致都需要经过词法解析、语法解析、生成抽象语法树(AST)、校验以及生成逻辑执行计划等步骤。整体流程可笼统地概括为两大阶段:从SQL到Operation的转换,再从Operation到Transformation的转换,最终进入分布式执行阶段。接下来,我们将以INSERT INTO语句为例,深入剖析Flink SQL的执行流程,探究其内部数据的流转机制以及如何基于Calcite进行功能拓展。

一、SQL到Operation转化:解析与转换的起点

Flink SQL的所有SQL执行入口为TableEnvironment,它提供了DML(数据操作语言)、DDL(数据定义语言)、DQL(数据查询语言)等功能,像executeSql、sqlQuery、registerFunction等方法都是其能力的体现,在日常开发中,executeSql方法的使用频率较高。

以INSERT INTO语句为例,探究Flink SQL到Operation的转换过程。这一过程首先借助Planner提供的基于Calcite的SQL解析器,将SQL字符串转换为Operation。具体而言,通过ParserImpl#parse()方法把字符串转化为ModifyOperation树,并生成新的TableResult对象,其核心步骤如下:

  1. 将SQL解析字符串转为SqlNode:利用Calcite的强大解析能力,对输入的SQL字符串进行词法和语法分析,生成SqlNode。这是整个转换过程的基础,只有准确生成SqlNode,后续操作才能顺利进行 。
  2. 将SqlNode校验:对生成的SqlNode进行严格校验,确保其符合语法和语义规则,为后续转换提供可靠保障。
  3. 通过SqlToOperationConverter#convert()将SqlNode转为ModifyOperation:依据SQL语句的类型和语义,将经过校验的SqlNode转换为对应的Operation,如INSERT INTO语句会被转换为ModifyOperation。

以INSERT INTO执行语句为例,其执行入口位于TableEnvironmentImpl#executeSql(),核心实现是ParserImpl#parse()方法。该方法获取基于Calcite进行语法拓展的SQL解析器,词法、语法解析以及校验工作均由Calcite完成。在使用Calcite解析之前,会优先调用ExtendedParser#parse()方法进行解析,此方法主要用于处理一些CalciteParser不支持的特殊命令,例如SET key=value中键和值标识符包含特殊字符的情况,这样可以避免引入新的保留关键字,提高解析的灵活性和兼容性。ParserImpl#parse部分实现如下:

CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// 解析sql
Optional<Operation> command = EXTENDED_PARSER.parse(statement);
if (command.isPresent()) {return Collections.singletonList(command.get());
}
SqlNode parsed = parser.parse(statement);

上述代码通过Calcite#parse方法解析SQL后,成功得到SqlNode对象。而对于SqlNode的校验和转换为Operation的操作,则是通过SqlToOperationConverter#convert()方法实现。其内部借助flinkPlanner#validate对SqlNode进行校验,详细实现如下:

def validate(sqlNode: SqlNode): SqlNode = {// 获取当前作业对元数据val validator = getOrCreateSqlValidator()validateInternal(sqlNode, validator)
}
private def validateInternal(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {try {sqlNode.accept(new PreValidateReWriter(validator.getCatalogReader.unwrap(classOf[CatalogReader]), typeFactory))// 进行扩展验证。sqlNode match {case node: ExtendedSqlNode =>node.validate()case _ =>}// 不需要验证DDL的行类型并插入节点。if (sqlNode.getKind.belongsTo(SqlKind.DDL)|| .......) {return sqlNode}sqlNode match {case richExplain: SqlRichExplain =>val validated = validator.validate(richExplain.getStatement)richExplain.setOperand(0, validated)richExplaincase _ =>validator.validate(sqlNode)}}

经过上述校验流程,确保SqlNode合法后,会根据其类型转换为对应的Operation。例如,INSERT INTO语句会被转换为ModifyOperation,具体转换实现如下:

public static Optional<Operation> convert(FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {// validate the queryfinal SqlNode validated = flinkPlanner.validate(sqlNode);SqlToOperationConverter converter =new SqlToOperationConverter(flinkPlanner, catalogManager);if (validated instanceof SqlUseCatalog) {return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));} else if (validated instanceof SqlShowCatalogs) {return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated));} else if (validated instanceof SqlShowCurrentCatalog) {return Optional.of(converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated));}if (validated instanceof SqlCreateDatabase) {return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));} else if (validated instanceof SqlDropDatabase) {return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));} else if (validated instanceof SqlAlterDatabase) {return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));} else if (validated instanceof SqlShowDatabases) {return Optional.of(converter.convertShowDatabases((SqlShowDatabases) validated));} ..................}

至此,Flink SQL完成了从SQL字符串到Operation的转换过程,为后续的处理奠定了基础。

二、Operation到Transformation:优化与执行计划生成

在完成SQL到Operation的转换后,接下来需要将Operation转换为Transformation。仍以INSERT INTO语句为例,SQL经过转换后会返回Operation的集合,取出集合中的第一个元素传入executeInternal方法中继续执行。该方法会依据Operation的类型进行相应处理,例如CreateTableOperation、DropTableOperation等会触发对catalogManager的相关操作;而对于INSERT INTO语句对应的ModifyOperation类型,则会传入executeInternal的另一个重载方法进行处理。根据Operation类型进行相应处理的部分代码如下:

if (operation instanceof ModifyOperation) {return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof CreateTableOperation) {CreateTableOperation createTableOperation = (CreateTableOperation) operation;if (createTableOperation.isTemporary()) {catalogManager.createTemporaryTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());} else {catalogManager.createTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());}return TableResultImpl.TABLE_RESULT_OK;
}
....................

executeInternal的另一个重载方法是这一阶段的核心,其核心代码如下:

public TableResult executeInternal(List<ModifyOperation> operations) {List<Transformation<?>> transformations = translate(operations);List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);TableResult result = executeInternal(transformations, sinkIdentifierNames);result.await();return result;
}

从上述代码可以看出,从Operation到Transformation转换流程的核心方法是translate方法。该方法借助Planner#translate完成相关功能,Planner的默认实现类为PlannerBase。translate方法主要负责将Operation集合转换为Calcite的物理执行计划RelNode,然后对RelNode进行优化,最终将其转换为Transformation,具体代码如下:

override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {validateAndOverrideConfiguration()if (modifyOperations.isEmpty) {return List.empty[Transformation[_]]}// 借助translateToRel转换Operation为relNodeval relNodes = modifyOperations.map(translateToRel)// 优化物理执行计划val optimizedRelNodes = optimize(relNodes)// 获取优化后的执行计划转化为ExecNodeGraphval execGraph = translateToExecNodeGraph(optimizedRelNodes)// 根据作业类型,使用不同的Planner进行算子转换,如流处理则使用:StreamPlanner、批处理则使用:BatchPlannerval transformations = translateToPlan(execGraph)cleanupInternalConfigurations()transformations
}

Operation转换为RelNode的详细实现在translateToRel方法中,该方法也是获取Flink SQL血缘关系的核心实现。开发者如有相关需求,可以复写此方法,使Flink SQL具备更灵活的血缘解析功能。在RelNode的优化过程中,主要涉及CommonSubGraphBasedOptimizer、BatchCommonSubGraphBasedOptimizer、StreamCommonSubGraphBasedOptimizer这三个类,从名称即可看出,StreamCommonSubGraphBasedOptimizer负责流处理的优化,BatchCommonSubGraphBasedOptimizer则针对批处理进行优化。

RelNode经过优化器优化后,会转换为ExecNodeGraph,最后通过translateToPlan()方法将ExecNodeGraph转换为transformations流水线。至此,Flink SQL完成了从Operation到Transformation的转换,得到的transformations流水线与使用StreamExecutionEnvironment开发的方式类似,后续将依次经过流图/plan、作业图、执行图的转换,最终进入分布式执行阶段。

三、总结:深入理解Flink SQL执行精髓

通过以INSERT INTO语句为切入点,对Flink SQL执行流程的详细剖析,我们深入了解了其从SQL语句输入到分布式执行的全过程。在这个过程中,Calcite框架发挥了关键作用,Flink SQL基于Calcite进行拓展,实现了强大的SQL解析、校验和优化功能。

从SQL到Operation的转换,将用户输入的SQL语句转化为系统能够处理的操作对象;而Operation到Transformation的转换,则进一步将操作对象优化并转换为可执行的计划。这两个阶段紧密配合,确保了Flink SQL能够高效、准确地执行用户的指令。

深入理解Flink SQL的执行流程,不仅有助于开发者更好地使用Flink SQL进行大数据处理,还能在遇到性能问题或需要进行功能拓展时,快速定位问题并找到解决方案。同时,也为我们在其他类似的数据处理场景中,设计和优化执行流程提供了宝贵的经验借鉴。

http://www.lryc.cn/news/574041.html

相关文章:

  • 机器学习基础:从概念到应用的全面解析
  • mac隐藏文件现身快捷键
  • Node.js 中的 JWT 认证:从生成到验证的完整指南
  • 深入浅出Node.js中间件机制
  • Apache SeaTunnel Spark引擎执行流程源码分析
  • 17、Rocket MQ快速实战以及核⼼概念详解
  • 更新麒麟连不上外网
  • 从理论到实践:Air8101外挂Air780EPM模块,实现4G联网能力!
  • 游戏盾:守护虚拟世界的坚固堡垒
  • 「Linux用户账号管理」组群管理
  • ActixWeb框架实战案例精萃
  • DAY 40 训练和测试的规范写法
  • 详解HarmonyOS NEXT仓颉开发语言中的全局弹窗
  • LED-Merging: 无需训练的模型合并框架,兼顾LLM安全和性能!!
  • Spring AI 项目实战(十二):Spring Boot +AI + DeepSeek + 百度OCR 公司发票智能处理系统的技术实践(附完整源码)
  • Maven 多模块项目调试与问题排查总结
  • 2、结合STM32CubeMX学习FreeRTOS实时操作系统——任务
  • 半导体行业中的专用标准产品ASSP是什么?
  • 探秘Flink维表:从源码到运行时的深度解析
  • Java面试复习指南:并发编程、JVM、Spring框架、数据结构与算法、Java 8新特性
  • 人机融合智能 | 人智交互的神经人因学方法
  • 【ARM 嵌入式 编译系列 7.5 -- GCC 打印链接脚本各段使用信息】
  • Java面试复习:基础、并发、JVM及框架核心考点解析
  • AI辅助编程工具技术评估(2025年):CodeBuddy在开发者生态中的差异化优势分析
  • 【达梦数据库】忘记SYSDBA密码处理方法-已适配
  • 图像处理基础篇
  • 麒麟系统上设置Firefox自动化测试环境:指定Marionette端口号
  • 纯血HarmonyOS5 打造小游戏实践:扫雷(附源文件)
  • 电脑的虚拟内存对性能影响大吗
  • 深入理解JavaScript设计模式之迭代器模式