当前位置: 首页 > news >正文

探秘Flink维表:从源码到运行时的深度解析

在大数据实时处理领域,Flink凭借其强大的流处理能力占据重要地位。在日常开发中,我们常常面临这样的需求:以原始流数据为基础,关联第三方存储引擎来补充数据属性。在传统OLAP或OLTP系统中,多表关联的原理易于理解,例如Apache Doris可将一张表加载到内存中完成数据关联 。然而,在流表上实现与原始流数据的关联操作,其原理却并不容易掌握。多数时候,开发者仅停留在使用层面,面对任务优化往往无从下手。本文将以JDBC维表关联为例,深入剖析Flink维表的实现原理,从执行计划到运行时层面,揭开其神秘面纱。

一、核心概念:理解Flink维表的基石

在深入探究Flink维表实现原理之前,我们需要先明晰几个核心概念,为后续的知识理解奠定基础。

1.1 维表

维表是数据仓库中的重要概念,其维度属性为数据观察提供了不同视角。在离线数仓建设中,通常将维表与事实表关联构建星型模型。在实时数仓领域,同样存在维表与事实表。其中,事实表常存储于Kafka等消息队列,而维表一般存储在MySQL、HBase等外部存储设备 。对于每条流式数据,可关联外部维表数据源,实现实时计算中的数据关联查询。值得注意的是,维表数据可能动态变化,在进行维表JOIN操作时,需明确记录关联维表快照的时刻。目前,Flink SQL的维表JOIN仅支持处理时间语义下当前时刻的维表快照关联,暂不支持基于事件时间语义的关联。

1.2 流表

传统SQL和关系代数主要针对有界数据设计,若直接应用于流计算会面临诸多问题。为解决这一难题,Flink引入动态流表概念,将无界数据流表示为随时间持续写入数据的表。在Flink体系中,“流”对应DataStream API概念,“动态流表”属于Flink SQL范畴,本质上二者都是无界数据集的不同表示形式。

1.3 异步I/O

在流处理应用中,与外部系统交互(如使用数据库数据扩充流数据)时,通信延迟会对整体性能产生显著影响。常规的同步交互方式,例如使用MapFunction访问外部数据库,函数会在发送请求后一直等待响应,大量时间消耗在等待过程中。而异步交互允许并行函数实例并发处理多个请求与接收响应,将等待时间分摊到多个请求,从而大幅提升流处理的吞吐量。

二、JDBC维表实现:从选择到实践

2.1 选择JDBC维表的原因

Flink官方提供了多种支持维表功能的connector,如jdbc、hive、hbase等。由于本地环境仅安装了MySQL,未部署hive、hbase等第三方存储引擎,因此选择JDBC实现的维表功能作为分析切入点。尽管不同connector的具体实现存在差异,但核心原理具有相似性,官方提供的connector地址可查阅更多相关信息。

2.2 前期准备:搭建测试环境

为了更直观地理解JDBC维表的实现原理,通过搭建一个测试任务来调试运行流程。该测试任务以Kafka作为数据源,MySQL作为维表和结果表存储,具体步骤如下:

  1. 引入依赖
    在项目中引入相关依赖,若为普通项目运行测试,需添加以下依赖及打包插件:
<!-- flink-connector-jdbc_2.12 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.6</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.6</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.13.6</version><scope>test</scope>
</dependency>
<!-- mysql-connector-java -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.22</version>
</dependency>
<plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><id>shade</id><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers combine.children="append"><!-- The service transformer is needed to merge META-INF/services files --><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin>
</plugins>

若在源码中进行调试,则在flink-examples-table模块的pom文件中引入以下依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14-SNAPSHOT</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14-SNAPSHOT</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.14-SNAPSHOT</version>
</dependency>
  1. 测试任务代码实现
public class LookupFunctionExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // source only supports parallelism of 1final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);String kafkaSource = "CREATE TABLE kafka_source (\n"+ "  -- declare the schema of the table\n"+ "  `id` INT,\n"+ "  `name` STRING,\n"+ "  `sex` STRING,\n"+ "  `age` INT,   \n"+ "  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',\n"+ "  `proctime` AS PROCTIME(),    -- use a computed column to define a proctime attribute\n"+ "  WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND    -- use a WATERMARK statement to define a ts attribute\n"+ ") WITH (\n"+ "  -- declare the external system to connect to\n"+ "  'connector' = 'kafka',\n"+ "  'topic' = 'lookuptest',\n"+ "  'scan.startup.mode' = 'latest-offset',\n"+ "  'properties.bootstrap.servers' = 'localhost:9092',\n"+ "  'format' = 'json'   -- declare a format for this system\n"+ ")";String lookupSql = "CREATE TABLE lookup_tbl (\n"+ "  id BIGINT,\n"+ "  name STRING,\n"+ "  PRIMARY KEY (id) NOT ENFORCED\n"+ ") WITH (\n"+ "   'connector' = 'jdbc',\n"+ "   'url' = 'jdbc:mysql://localhost:3306/flink_test',\n"+ "   'username'='root',\n"+ "   'password'='123456789',\n"+ "   'table-name' = 'flink_lookup'\n"+ ")";String sinkSql = "CREATE TABLE sink_tbl (\n"+ "  `id` INT,\n"+ "  `name` STRING,\n"+ "  `sex` STRING,\n"+ "  `age` INT,   \n"+ "  PRIMARY KEY (id) NOT ENFORCED\n"+ ") WITH (\n"+ "   'connector' = 'jdbc',\n"+ "   'url' = 'jdbc:mysql://localhost:3306/flink_test',\n"+ "   'username'='root',\n"+ "   'password'='123456789',\n"+ "   'table-name' = 'flink_test'\n"+ ")";String insertSql = "INSERT INTO sink_tbl SELECT a.id, b.name, a.age, a.sex FROM kafka_source a\n"+ "LEFT JOIN lookup_tbl FOR SYSTEM_TIME AS OF a.proctime AS  b \n"+ "ON a.id = b.id";tEnv.executeSql(kafkaSource);tEnv.executeSql(lookupSql);tEnv.executeSql(sinkSql);tEnv.executeSql(insertSql);env.execute("LookupFunction Test");}
}

该代码通过Flink的Table API定义了Kafka数据源表、JDBC维表和JDBC结果表,并执行了从Kafka源表与JDBC维表的LEFT JOIN操作,将结果写入JDBC结果表。

三、JDBC维表源码剖析:核心功能的实现细节

在Flink的SQL Connector中,Source Function不仅负责源表数据读取,还承担维表功能。其入口为org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory,该类中定义了源表和目标表的创建方法,分别为createDynamicTableSourcecreateDynamicTableSink。其中,JdbcDynamicTableSource实现了ScanTableSource接口的源表功能和LookupTableSource接口的维表功能,在运行阶段,这两个接口会被转换为TableFunction执行,维表在运行时通过CommonExecLookupJoin进行算子转换。

在这里插入图片描述

JdbcDynamicTableSourcegetLookupRuntimeProvider方法是实现维表功能的关键,部分代码如下:

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {// JDBC只支持非嵌套查找键String[] keyNames = new String[context.getKeys().length];for (int i = 0; i < keyNames.length; i++) {int[] innerKeyArr = context.getKeys()[i];Preconditions.checkArgument(innerKeyArr.length == 1, "JDBC only support non-nested look up keys");keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];}final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();return TableFunctionProvider.of(new JdbcRowDataLookupFunction(options,lookupOptions,physicalSchema.getFieldNames(),physicalSchema.getFieldDataTypes(),keyNames,rowType));
}

从上述代码可知,维表的数据读取逻辑主要在JdbcRowDataLookupFunction中实现,其核心方法为openevalopen方法主要用于确保维表连接正常,并根据配置初始化缓存对象:

public void open(FunctionContext context) throws Exception {try {// 确保维表能正常连接establishConnectionAndStatement();// 初始化缓存,设置缓存最大值、过期时间this.cache =cacheMaxSize == -1 || cacheExpireMs == -1? null: CacheBuilder.newBuilder().expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize).build();} catch (SQLException sqe) {throw new IllegalArgumentException("open() failed.", sqe);} catch (ClassNotFoundException cnfe) {throw new IllegalArgumentException("JDBC driver class not found.", cnfe);}
}

eval方法负责实际的数据读取操作,由于代码较长,核心逻辑如下:

public void eval(Object... keys) {RowData keyRow = GenericRowData.of(keys);if (cache != null) {List<RowData> cachedRows = cache.getIfPresent(keyRow);if (cachedRows != null) {for (RowData cachedRow : cachedRows) {collect(cachedRow);}return;}}for (int retry = 0; retry <= maxRetryTimes; retry++) {statement.clearParameters();statement = lookupKeyRowConverter.toExternal(keyRow, statement);try (ResultSet resultSet = statement.executeQuery()) {if (cache == null) {while (resultSet.next()) {collect(jdbcRowConverter.toInternal(resultSet));}} else {ArrayList<RowData> rows = new ArrayList<>();while (resultSet.next()) {RowData row = jdbcRowConverter.toInternal(resultSet);rows.add(row);collect(row);}rows.trimToSize();cache.put(keyRow, rows);}}break;}
}

eval方法首先检查缓存,若缓存中有数据则直接发送到下游算子;若无缓存数据,则根据查询条件从数据库获取数据,并在缓存存在时将数据存入缓存。

四、维表运行时剖析:从代码到执行的全流程追踪

通过对JDBC维表源码的分析,我们知道数据读取在eval方法中完成。为进一步了解维表在运行时的调用过程,我们从Flink SQL的运行流程入手。Flink SQL的执行需经过词法解析、语法解析、抽象语法转换、优化器、物理执行计划等环节,我们在TableEnvironmentImpl#executeInternal方法处设置断点进行调试。
在这里插入图片描述

调试发现,运行时的核心类为LookupJoinRunner,其processElement方法实现如下:

public void processElement(RowData in, Context ctx, Collector<RowData> out) throws Exception {collector.setCollector(out);collector.setInput(in);collector.reset();// 当对象重用被启用时,Fetcher已经复制了输入字段fetcher.flatMap(in, getFetcherCollector());if (isLeftOuterJoin && !collector.isCollected()) {outRow.replace(in, nullRow);outRow.setRowKind(in.getRowKind());out.collect(outRow);}
}

数据拉取操作主要在fetcher.flatMap方法中,fetcher对象在open阶段实例化。进一步深入调试,获取userFunctiongenerateFetchergeneratedCollector的核心代码:
在这里插入图片描述

4.1 generateFetcher#flatMap

public class LookupFunction$10extends org.apache.flink.api.common.functions.RichFlatMapFunction {@Overridepublic void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;int field$6;boolean isNull$6;isNull$6 = in1.isNullAt(0);field$6 = -1;if (!isNull$6) {field$6 = in1.getInt(0);}resultConverterCollector$9.setCollector(c);if (isNull$6) {// skip} else {// 去调用维表中的evalfunction_org$apache$flink$connector$jdbc$table$JdbcRowDataLookupFunction$096de1ff849635b06dca993bab61661b.eval(isNull$6 ?// 因为关联条件是主键id 整数类型,在此处进行强转null : ((java.lang.Integer) field$6));}}
}

4.2 generatedCollector#collect

public class JoinTableFuncCollector$14 extends org.apache.flink.table.runtime.collector.TableFunctionCollector {@Overridepublic void collect(Object record) throws Exception {org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) getInput();org.apache.flink.table.data.RowData in2 = (org.apache.flink.table.data.RowData) record;int field$11;boolean isNull$11;org.apache.flink.table.data.binary.BinaryStringData field$12;boolean isNull$12;isNull$12 = in2.isNullAt(1);field$12 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;if (!isNull$12) {field$12 = ((org.apache.flink.table.data.binary.BinaryStringData) in2.getString(1));}isNull$11 = in2.isNullAt(0);field$11 = -1;if (!isNull$11) {field$11 = in2.getInt(0);}if (isNull$11) {out.setField(0, null);} else {out.setField(0, field$11);}if (isNull$12) {out.setField(1, null);} else {out.setField(1, field$12);}// 处理两个流的数据进行join,并将数据结果,继续发送到下游joinedRow$13.replace(in1, out);joinedRow$13.setRowKind(in1.getRowKind());outputResult(joinedRow$13);}
}

以上为整个维表的实现流程剖析

http://www.lryc.cn/news/574022.html

相关文章:

  • Java面试复习指南:并发编程、JVM、Spring框架、数据结构与算法、Java 8新特性
  • 人机融合智能 | 人智交互的神经人因学方法
  • 【ARM 嵌入式 编译系列 7.5 -- GCC 打印链接脚本各段使用信息】
  • Java面试复习:基础、并发、JVM及框架核心考点解析
  • AI辅助编程工具技术评估(2025年):CodeBuddy在开发者生态中的差异化优势分析
  • 【达梦数据库】忘记SYSDBA密码处理方法-已适配
  • 图像处理基础篇
  • 麒麟系统上设置Firefox自动化测试环境:指定Marionette端口号
  • 纯血HarmonyOS5 打造小游戏实践:扫雷(附源文件)
  • 电脑的虚拟内存对性能影响大吗
  • 深入理解JavaScript设计模式之迭代器模式
  • Docker部署prometheus+grafana+...
  • 【论文阅读35】-PINN review(2021)
  • 华为云 Flexus+DeepSeek 征文|增值税发票智能提取小工具:基于大模型的自动化信息解析实践
  • 虚拟 DOM 与 Diff 算法:现代前端框架的核心机制
  • [3-01-02].第15节:调优工具 - 查看 SQL 执行成本
  • 编程捏脸系统:从美术资源到实时变形的深度实现
  • 系统规划与管理师(第2版)第9章思维导图发布
  • STM32HAL库 -- 9.IIC通信 软件IIC与硬件IIC驱动0.96寸OLED屏幕
  • 【Linux指南】文件管理高级操作(复制、移动、查找)
  • GO 语言学习 之 代码风格
  • 时序数据库IoTDB数据导入与查询功能详解
  • 「ECG信号处理——(18)基于时空特征的心率变异性分析」2025年6月23日
  • IDEA中如何为 Spring Boot 项目添加 VM 参数?
  • 微服务架构下的分布式事务管理
  • CSS 中aspect - ratio属性的用途及应用
  • 【面板数据】上市公司投资者保护指数(2010-2023年)
  • 兆瓦闪充技术革命:解码新能源汽车补能赛道的技术跃迁与从业机会图谱
  • LNMP 一键部署脚本 shell脚本
  • Postgresql中不同数据类型的长度限制