Flink SQL执行流程深度剖析:从SQL语句到分布式执行
在大数据处理领域,Flink SQL凭借其强大的处理能力和易用性,成为众多开发者的选择。与其他OLAP引擎类似,Flink SQL的SQL执行流程大致都需要经过词法解析、语法解析、生成抽象语法树(AST)、校验以及生成逻辑执行计划等步骤。整体流程可笼统地概括为两大阶段:从SQL到Operation的转换,再从Operation到Transformation的转换,最终进入分布式执行阶段。接下来,我们将以INSERT INTO语句为例,深入剖析Flink SQL的执行流程,探究其内部数据的流转机制以及如何基于Calcite进行功能拓展。
一、SQL到Operation转化:解析与转换的起点
Flink SQL的所有SQL执行入口为TableEnvironment,它提供了DML(数据操作语言)、DDL(数据定义语言)、DQL(数据查询语言)等功能,像executeSql、sqlQuery、registerFunction等方法都是其能力的体现,在日常开发中,executeSql方法的使用频率较高。
以INSERT INTO语句为例,探究Flink SQL到Operation的转换过程。这一过程首先借助Planner提供的基于Calcite的SQL解析器,将SQL字符串转换为Operation。具体而言,通过ParserImpl#parse()方法把字符串转化为ModifyOperation树,并生成新的TableResult对象,其核心步骤如下:
- 将SQL解析字符串转为SqlNode:利用Calcite的强大解析能力,对输入的SQL字符串进行词法和语法分析,生成SqlNode。这是整个转换过程的基础,只有准确生成SqlNode,后续操作才能顺利进行 。
- 将SqlNode校验:对生成的SqlNode进行严格校验,确保其符合语法和语义规则,为后续转换提供可靠保障。
- 通过SqlToOperationConverter#convert()将SqlNode转为ModifyOperation:依据SQL语句的类型和语义,将经过校验的SqlNode转换为对应的Operation,如INSERT INTO语句会被转换为ModifyOperation。
以INSERT INTO执行语句为例,其执行入口位于TableEnvironmentImpl#executeSql(),核心实现是ParserImpl#parse()方法。该方法获取基于Calcite进行语法拓展的SQL解析器,词法、语法解析以及校验工作均由Calcite完成。在使用Calcite解析之前,会优先调用ExtendedParser#parse()方法进行解析,此方法主要用于处理一些CalciteParser不支持的特殊命令,例如SET key=value中键和值标识符包含特殊字符的情况,这样可以避免引入新的保留关键字,提高解析的灵活性和兼容性。ParserImpl#parse部分实现如下:
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// 解析sql
Optional<Operation> command = EXTENDED_PARSER.parse(statement);
if (command.isPresent()) {return Collections.singletonList(command.get());
}
SqlNode parsed = parser.parse(statement);
上述代码通过Calcite#parse方法解析SQL后,成功得到SqlNode对象。而对于SqlNode的校验和转换为Operation的操作,则是通过SqlToOperationConverter#convert()方法实现。其内部借助flinkPlanner#validate对SqlNode进行校验,详细实现如下:
def validate(sqlNode: SqlNode): SqlNode = {// 获取当前作业对元数据val validator = getOrCreateSqlValidator()validateInternal(sqlNode, validator)
}
private def validateInternal(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {try {sqlNode.accept(new PreValidateReWriter(validator.getCatalogReader.unwrap(classOf[CatalogReader]), typeFactory))// 进行扩展验证。sqlNode match {case node: ExtendedSqlNode =>node.validate()case _ =>}// 不需要验证DDL的行类型并插入节点。if (sqlNode.getKind.belongsTo(SqlKind.DDL)|| .......) {return sqlNode}sqlNode match {case richExplain: SqlRichExplain =>val validated = validator.validate(richExplain.getStatement)richExplain.setOperand(0, validated)richExplaincase _ =>validator.validate(sqlNode)}}
经过上述校验流程,确保SqlNode合法后,会根据其类型转换为对应的Operation。例如,INSERT INTO语句会被转换为ModifyOperation,具体转换实现如下:
public static Optional<Operation> convert(FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {// validate the queryfinal SqlNode validated = flinkPlanner.validate(sqlNode);SqlToOperationConverter converter =new SqlToOperationConverter(flinkPlanner, catalogManager);if (validated instanceof SqlUseCatalog) {return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));} else if (validated instanceof SqlShowCatalogs) {return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated));} else if (validated instanceof SqlShowCurrentCatalog) {return Optional.of(converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated));}if (validated instanceof SqlCreateDatabase) {return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));} else if (validated instanceof SqlDropDatabase) {return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));} else if (validated instanceof SqlAlterDatabase) {return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));} else if (validated instanceof SqlShowDatabases) {return Optional.of(converter.convertShowDatabases((SqlShowDatabases) validated));} ..................}
至此,Flink SQL完成了从SQL字符串到Operation的转换过程,为后续的处理奠定了基础。
二、Operation到Transformation:优化与执行计划生成
在完成SQL到Operation的转换后,接下来需要将Operation转换为Transformation。仍以INSERT INTO语句为例,SQL经过转换后会返回Operation的集合,取出集合中的第一个元素传入executeInternal方法中继续执行。该方法会依据Operation的类型进行相应处理,例如CreateTableOperation、DropTableOperation等会触发对catalogManager的相关操作;而对于INSERT INTO语句对应的ModifyOperation类型,则会传入executeInternal的另一个重载方法进行处理。根据Operation类型进行相应处理的部分代码如下:
if (operation instanceof ModifyOperation) {return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof CreateTableOperation) {CreateTableOperation createTableOperation = (CreateTableOperation) operation;if (createTableOperation.isTemporary()) {catalogManager.createTemporaryTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());} else {catalogManager.createTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());}return TableResultImpl.TABLE_RESULT_OK;
}
....................
executeInternal的另一个重载方法是这一阶段的核心,其核心代码如下:
public TableResult executeInternal(List<ModifyOperation> operations) {List<Transformation<?>> transformations = translate(operations);List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);TableResult result = executeInternal(transformations, sinkIdentifierNames);result.await();return result;
}
从上述代码可以看出,从Operation到Transformation转换流程的核心方法是translate方法。该方法借助Planner#translate完成相关功能,Planner的默认实现类为PlannerBase。translate方法主要负责将Operation集合转换为Calcite的物理执行计划RelNode,然后对RelNode进行优化,最终将其转换为Transformation,具体代码如下:
override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {validateAndOverrideConfiguration()if (modifyOperations.isEmpty) {return List.empty[Transformation[_]]}// 借助translateToRel转换Operation为relNodeval relNodes = modifyOperations.map(translateToRel)// 优化物理执行计划val optimizedRelNodes = optimize(relNodes)// 获取优化后的执行计划转化为ExecNodeGraphval execGraph = translateToExecNodeGraph(optimizedRelNodes)// 根据作业类型,使用不同的Planner进行算子转换,如流处理则使用:StreamPlanner、批处理则使用:BatchPlannerval transformations = translateToPlan(execGraph)cleanupInternalConfigurations()transformations
}
Operation转换为RelNode的详细实现在translateToRel方法中,该方法也是获取Flink SQL血缘关系的核心实现。开发者如有相关需求,可以复写此方法,使Flink SQL具备更灵活的血缘解析功能。在RelNode的优化过程中,主要涉及CommonSubGraphBasedOptimizer、BatchCommonSubGraphBasedOptimizer、StreamCommonSubGraphBasedOptimizer这三个类,从名称即可看出,StreamCommonSubGraphBasedOptimizer负责流处理的优化,BatchCommonSubGraphBasedOptimizer则针对批处理进行优化。
RelNode经过优化器优化后,会转换为ExecNodeGraph,最后通过translateToPlan()方法将ExecNodeGraph转换为transformations流水线。至此,Flink SQL完成了从Operation到Transformation的转换,得到的transformations流水线与使用StreamExecutionEnvironment开发的方式类似,后续将依次经过流图/plan、作业图、执行图的转换,最终进入分布式执行阶段。
三、总结:深入理解Flink SQL执行精髓
通过以INSERT INTO语句为切入点,对Flink SQL执行流程的详细剖析,我们深入了解了其从SQL语句输入到分布式执行的全过程。在这个过程中,Calcite框架发挥了关键作用,Flink SQL基于Calcite进行拓展,实现了强大的SQL解析、校验和优化功能。
从SQL到Operation的转换,将用户输入的SQL语句转化为系统能够处理的操作对象;而Operation到Transformation的转换,则进一步将操作对象优化并转换为可执行的计划。这两个阶段紧密配合,确保了Flink SQL能够高效、准确地执行用户的指令。
深入理解Flink SQL的执行流程,不仅有助于开发者更好地使用Flink SQL进行大数据处理,还能在遇到性能问题或需要进行功能拓展时,快速定位问题并找到解决方案。同时,也为我们在其他类似的数据处理场景中,设计和优化执行流程提供了宝贵的经验借鉴。