当前位置: 首页 > news >正文

详解flink sql, calcite logical转flink logical

文章目录

    • 背景
    • 示例
    • FlinkLogicalCalcConverter
    • BatchPhysicalCalcRule
    • StreamPhysicalCalcRule
    • 其它算子
      • FlinkLogicalAggregate
      • FlinkLogicalCorrelate
      • FlinkLogicalDataStreamTableScan
      • FlinkLogicalDistribution
      • FlinkLogicalExpand
      • FlinkLogicalIntermediateTableScan
      • FlinkLogicalIntersect
      • FlinkLogicalJoin
      • FlinkLogicalLegacySink
      • FlinkLogicalLegacyTableSourceScan
      • FlinkLogicalMatch
      • FlinkLogicalMinus
      • FlinkLogicalOverAggregate
      • FlinkLogicalRank
      • FlinkLogicalSink
      • FlinkLogicalSnapshot
      • FlinkLogicalSort
      • FlinkLogicalUnion
      • FlinkLogicalValues

背景

本文主要介绍calcite 如何转成自定义的relnode

在这里插入图片描述

示例

FlinkLogicalCalcConverter

检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc

private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {override def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[LogicalCalc]val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)FlinkLogicalCalc.create(newInput, calc.getProgram)}
}

BatchPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[FlinkLogicalCalc]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)}
}

StreamPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)}
}

其它算子

介绍下算子的匹配条件

FlinkLogicalAggregate

对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用并且支持聚合函数,则返回true

override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleval supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {// we support AVGcase SqlKind.AVG => true// but none of the other AVG agg functionscase k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => falsecase _ => true}val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)!hasAccurateDistinctCall && supported}

FlinkLogicalAggregateStreamConverter

SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换

override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleagg.getAggCallList.map(_.getAggregation.getKind).forall {case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => falsecase _ => true}}

FlinkLogicalCorrelate

对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
检查relnode 是不是LogicalCorrelate,重写relnode

默认的onMatch 函数

FlinkLogicalDataStreamTableScan

对应的SQL语义是,检查数据源是不是流式的
检查relnode 是不是LogicalCorrelate,重写relnode

  override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])dataStreamTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)}

FlinkLogicalDistribution

描述数据是不是打散的

  override def convert(rel: RelNode): RelNode = {val distribution = rel.asInstanceOf[LogicalDistribution]val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)}

FlinkLogicalExpand

支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val expand = rel.asInstanceOf[LogicalExpand]val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)}

FlinkLogicalIntermediateTableScan

FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作

override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])intermediateTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)}

FlinkLogicalIntersect

用于表示 SQL 中 INTERSECT 操作的逻辑运算符

override def convert(rel: RelNode): RelNode = {val intersect = rel.asInstanceOf[LogicalIntersect]val newInputs = intersect.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalIntersect.create(newInputs, intersect.all)}

FlinkLogicalJoin

用于表示 SQL 中 JOIN 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val join = rel.asInstanceOf[LogicalJoin]val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)}

FlinkLogicalLegacySink

写数据到传统的数据源

override def convert(rel: RelNode): RelNode = {val sink = rel.asInstanceOf[LogicalLegacySink]val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)FlinkLogicalLegacySink.create(newInput,sink.hints,sink.sink,sink.sinkName,sink.catalogTable,sink.staticPartitions)}

FlinkLogicalLegacyTableSourceScan

读传统的数据源

override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)isTableSourceScan(scan)}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)}

FlinkLogicalMatch

MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。

override def convert(rel: RelNode): RelNode = {val logicalMatch = rel.asInstanceOf[LogicalMatch]val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)new FlinkLogicalMatch(rel.getCluster,traitSet,newInput,logicalMatch.getRowType,logicalMatch.getPattern,logicalMatch.isStrictStart,logicalMatch.isStrictEnd,logicalMatch.getPatternDefinitions,logicalMatch.getMeasures,logicalMatch.getAfter,logicalMatch.getSubsets,logicalMatch.isAllRows,logicalMatch.getPartitionKeys,logicalMatch.getOrderKeys,logicalMatch.getInterval)}

FlinkLogicalMinus

用于表示 SQL 中 minus 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val minus = rel.asInstanceOf[LogicalMinus]val newInputs = minus.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalMinus.create(newInputs, minus.all)}

FlinkLogicalOverAggregate

用于表示 SQL 中 窗口函数操作的逻辑运算符

FlinkLogicalRank

SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名

override def convert(rel: RelNode): RelNode = {val rank = rel.asInstanceOf[LogicalRank]val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)FlinkLogicalRank.create(newInput,rank.partitionKey,rank.orderKey,rank.rankType,rank.rankRange,rank.rankNumberType,rank.outputRankNumber)}

FlinkLogicalSink

表示SQL里的写

FlinkLogicalSnapshot

SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照

def convert(rel: RelNode): RelNode = {val snapshot = rel.asInstanceOf[LogicalSnapshot]val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)snapshot.getPeriod match {case _: RexFieldAccess =>FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)case _: RexLiteral =>newInput}}

FlinkLogicalSort

表示SQL里的排序

FlinkLogicalUnion

表示SQL里的union 操作

 override def matches(call: RelOptRuleCall): Boolean = {val union: LogicalUnion = call.rel(0)union.all}override def convert(rel: RelNode): RelNode = {val union = rel.asInstanceOf[LogicalUnion]val newInputs = union.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalUnion.create(newInputs, union.all)}

FlinkLogicalValues

SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。

http://www.lryc.cn/news/391275.html

相关文章:

  • PostgreSQL的系统视图pg_statio_all_indexes
  • 【C++ Primer Plus学习记录】函数和C-风格字符串
  • 力扣双指针算法题目:移动零
  • day60---面试专题(微服务面试题-参考回答)
  • laravel+phpoffice+easyexcel实现导入
  • Spring Boot集成多数据源的最佳实践
  • Java项目:基于SSM框架实现的班主任助理管理系统【ssm+B/S架构+源码+数据库+开题报告+毕业论文】
  • 数据在内存中的存储方式
  • Selenium 监视数据收发
  • 基于 STM32 的智能睡眠呼吸监测系统设计
  • Spring的事务管理、AOP实现底层
  • 基于SpringBoot的篮球竞赛预约平台
  • 学生用小台灯什么牌子的好?列举出几款学生用台灯推荐
  • 软件测试面试题:项目中的MQ是如何测试的?
  • Python爬取国家医保平台公开数据
  • B站大课堂-自动化精品视频(个人存档)
  • C++_STL---priority_queue
  • 可移动天线辅助宽带通信的性能分析和优化
  • h5兼容table ,如何实现h5在app内使用h5渲染table表格而且实现横屏预览?
  • 在windows上安装objection
  • 人脸特征68点识别 C++
  • 部署LVS-DR 群集
  • nginx的正向代理和反向代理
  • 米国政府呼吁抛弃 C 和 C++
  • failed to lazily initialize a collection of role,解决Hibernate查询报错
  • Promethuse-监控 Etcd
  • linux桌面运维---第四天
  • 视频网关的作用
  • css+js实现导航栏色块跟随滑动+点击后增加样式
  • AudioLM音频生成模型:技术革新与应用前景