当前位置: 首页 > news >正文

Flink Stream API 源码走读 - print()

概述

本文深入分析了 Flink 中 print() 方法的源码实现,展示了 Sink 操作的完整流程,并通过调试验证了整个 Transformation 链条的构建过程。这是 Flink Stream API 系列课程的重要一环,帮助我们理解流处理 Pipeline 的终端操作机制。

1. print() 方法概览

1.1 在 WordCount 示例中的使用

// 数据处理流水线
DataStream<Tuple2<String, Integer>> wordCounts = text.map(value -> value).flatMap(new Splitter())  // 分词.keyBy(value -> value.f0)  // 按单词分组.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))  // 5秒滚动窗口.sum(1);  // 对计数字段求和// 打印结果 - 这里调用了 print()
wordCounts.print();

1.2 print() 方法的作用

DataStream
print()
DataStreamSink
终端操作
不可继续链式调用

核心特点:

  • 终端操作 - 标志着流处理 Pipeline 的结束
  • 返回类型变化 - 从 DataStream 变为 DataStreamSink
  • 断开链式调用 - 不能再调用 map、filter 等转换操作

2. print() 方法源码深度分析

2.1 DataStream.print() 入口方法

// DataStream.java 中的实现
@PublicEvolving
public DataStreamSink<T> print() {// 创建打印输出函数PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();return addSink(printFunction).name("Print to Std. Out");
}

执行流程:

  1. 创建 PrintSinkFunction 实例
  2. 调用 addSink() 方法
  3. 设置算子名称为 “Print to Std. Out”
  4. 返回 DataStreamSink 对象

2.2 PrintSinkFunction 业务逻辑分析

@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN>implements SupportsConcurrentExecutionAttempts {private final PrintSinkOutputWriter<IN> writer;public PrintSinkFunction() {writer = new PrintSinkOutputWriter<>(false);  // 输出到 stdout}@Overridepublic void invoke(IN record) {writer.write(record);  // 实际的打印逻辑}
}

关键组件说明:

  • RichSinkFunction - 提供丰富的生命周期方法
  • PrintSinkOutputWriter - 负责具体的输出格式化和写入
  • invoke() - 每条数据都会调用此方法进行处理

3. addSink() 方法核心流程

3.1 addSink 方法源码分析

// DataStream.java
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {// 1. 读取输出类型,检查类型信息transformation.getOutputType();// 2. 配置类型(如果需要)if (sinkFunction instanceof InputTypeConfigurable) {((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());}// 3. 调用静态工厂方法return DataStreamSink.forSinkFunction(this, clean(sinkFunction));
}

addSink 执行步骤:

  1. 类型检查 - 确保类型信息正确
  2. 类型配置 - 为支持类型配置的 SinkFunction 设置输入类型
  3. 函数清理 - 通过 clean() 方法处理闭包和序列化
  4. 委托创建 - 调用 DataStreamSink.forSinkFunction() 静态方法

3.2 DataStreamSink.forSinkFunction() 详解

// DataStreamSink.java
static <T> DataStreamSink<T> forSinkFunction(DataStream<T> inputStream, SinkFunction<T> sinkFunction) {// 1. Function → Operator:将 SinkFunction 包装成 StreamSink 命名不好,再次吐槽为啥不叫StreamSinkOperatorStreamSink<T> sinkOperator = new StreamSink<>(sinkFunction);final StreamExecutionEnvironment executionEnvironment =inputStream.getExecutionEnvironment();// 2. Operator → Transformation:创建 LegacySinkTransformationPhysicalTransformation<T> transformation =new LegacySinkTransformation<>(inputStream.getTransformation(),    // 上游 transformation"Unnamed",                         // 算子名称sinkOperator,                      // Sink 算子executionEnvironment.getParallelism(),  // 并行度false);                            // 并行度是否已配置// 3. 添加到执行环境executionEnvironment.addOperator(transformation);// 4. 创建 DataStreamSinkreturn new DataStreamSink<>(transformation);
}

3.3 分层抽象设计

用户 API 层
Function 层
Operator 层
Transformation 层
DataStream 层
Environment 管理层
print()
PrintSinkFunction
StreamSink
LegacySinkTransformation
DataStreamSink
Environment.transformations

转换层次详解:

  1. Function 层 - 用户定义的业务逻辑(PrintSinkFunction)
  2. Operator 层 - Flink 内部算子封装(StreamSink)
  3. Transformation 层 - 执行图节点(LegacySinkTransformation)
  4. DataStream 层 - 流式 API 封装(DataStreamSink)
  5. Environment 层 - 全局管理和优化

3.4 StreamSink 详情

public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>implements OneInputStreamOperator<IN, Object> {public StreamSink(SinkFunction<IN> sinkFunction) {super(sinkFunction);chainingStrategy = ChainingStrategy.ALWAYS;  // 总是可以链接}
}

StreamSink 的核心特性:

  • 继承 AbstractUdfStreamOperator - 获得用户函数管理能力
  • 实现 OneInputStreamOperator - 单输入流算子接口
  • ChainingStrategy.ALWAYS - 总是可以与上游算子链接优化

3.4 DataStreamSink 的结构分析

@Public
public class DataStreamSink<T> {private final PhysicalTransformation<T> transformation;protected DataStreamSink(PhysicalTransformation<T> transformation) {this.transformation = checkNotNull(transformation);}// 注意:没有继承 DataStream,没有 map、filter 等方法
}

DataStreamSink 的设计特点:

  • 不继承 DataStream - 有意断开链式调用链
  • 只持有 Transformation - 极简设计,表示流的终止
  • 终端节点 - 标志 Pipeline 的结束点
  • 不可扩展 - 防止在终端节点后继续添加操作

3.5 print() 方法完整时序图

在这里插入图片描述

时序图关键步骤说明:

  1. Function 创建 - 实例化 PrintSinkFunction,内部创建 PrintSinkOutputWriter
  2. 类型检查 - 验证输出类型信息,确保类型安全
  3. 函数清理 - 通过 ClosureCleaner 处理闭包和序列化问题
  4. 分层转换 - Function → Operator → Transformation 的逐层包装
  5. 环境注册 - 将 Transformation 添加到执行环境的全局列表
  6. API 封装 - 创建 DataStreamSink 作为用户 API 的返回值

4. Transformation 拓展

4.1 Environment 中的 Transformation 管理

// StreamExecutionEnvironment 中的核心管理
private final List<Transformation<?>> transformations = new ArrayList<>();public void addOperator(Transformation<?> transformation) {// 只有物理 Transformation 才会被添加transformations.add(transformation);
}

4.2 Environment 添加规则分析

重要发现:Environment 中只有 4个 Transformation(不是6个)

IDTransformation类型算子名称物理/虚拟添加到Environment
1LegacySourceTransformationsocketTextStream物理❌ 特殊处理
2OneInputTransformationmap物理
3OneInputTransformationflatMap物理
4PartitionTransformationkeyBy虚拟❌ 虚拟节点
5OneInputTransformationwindow.sum物理
6LegacySinkTransformationprint物理

核心规律:

  • 物理 Transformation - 代表真实的计算操作,添加到 Environment
  • 虚拟 Transformation - 仅用于逻辑表示和优化,不添加到 Environment
  • Source Transformation - 特殊的物理节点,但不添加到 Environment(特殊处理)

4.3 链式引用的数据结构

// 每个 Transformation 都持有上游的引用
public abstract class Transformation<T> {// 大部分 Transformation 都有 input 字段
}// 示例:OneInputTransformation
public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> {private final Transformation<IN> input;  // 指向上游
}// 示例:LegacySinkTransformation
public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {private final Transformation<T> input;  // 指向上游
}

4.4 完整的链式引用追溯

在这里插入图片描述

4.5 链式引用的核心价值

通过最后一个 Transformation 获取完整执行图:

// 从 DataStreamSink 开始追溯
DataStreamSink<String> sink = wordCounts.print();
LegacySinkTransformation sinkTransformation = sink.getTransformation();// 递归追溯整个链条
Transformation current = sinkTransformation;
while (current != null) {System.out.println("Transformation: " + current.getName());current = current.getInput();  // 获取上游
}

链式引用的优势:

  • 完整性 - 通过最后一个节点可以追溯到整个执行图
  • 简洁性 - 每个节点只需保存直接上游的引用
  • 灵活性 - 支持复杂的 DAG 结构(多输入、分支等)
  • 优化友好 - 便于执行计划的分析和优化

返回目录

Flink 源码系列 - 前言

http://www.lryc.cn/news/623062.html

相关文章:

  • B3865 [GESP202309 二级] 小杨的 X 字矩阵(举一反三)
  • 矩阵链相乘的最少乘法次数(动态规划解法)
  • 深入了解 swap:作用、局限与分区建立
  • Hadoop面试题及详细答案 110题 (16-35)-- HDFS核心原理与操作
  • 鸿蒙应用开发和Vue网页开发中生命周期的区别
  • (论文速读)ViDAR:视觉自动驾驶预训练框架
  • leetcode hot100数组:缺失的第一个正数
  • Winsows系统去除右键文件显示的快捷列表
  • Win11家庭版docker安装Minio
  • windows环境下使用vscode以及相关插件搭建c/c++的编译,调试环境
  • 93、23种设计模式之抽象工厂模式
  • MySQL建表练习
  • GaussDB 数据库架构师修炼(十三)安全管理(3)-数据库审计
  • 人工智能中的(特征选择)数据过滤方法和包裹方法
  • Linux 下 安装 matlab 2025A
  • 安卓11 12系统修改定制化_____修改系统 解锁system分区 去除data加密 自由删减系统应用
  • python线程学习
  • Leetcode 14 java
  • AI 云电竞游戏盒子:从“盒子”到“云-端-芯”一体化竞技平台的架构实践
  • WSL 配置文件 wsl.conf 设置
  • Windows 基于ACL(访问控制列表)的权限管理
  • LeetCode 55.跳跃游戏:贪心策略下的可达性判断
  • Windows 操作系统 - Windows 恢复浏览器标题栏颜色
  • tensorrt-llm0.20.0:Prometheus3.5.0通过间接采集,进行性能指标分析
  • AirReceiverLite:轻松实现手机隔空投屏
  • 自动驾驶中的传感器技术24.1——Camera(16)
  • 电路方案分析(二十二)适用于音频应用的25-50W反激电源方案
  • 40 C++ STL模板库9-容器2-vector
  • 下载数据集文件夹权限错误问题解决方案
  • PHP域名授权系统网站源码/授权管理工单系统/精美UI/附教程