当前位置: 首页 > news >正文

Flink Stream API 源码走读 - window 和 sum

本文核心观点

核心观点:WindowedStream 是一个"假流",它比 KeyedStream 更虚,只是一个 API 的过渡器,不是真正意义上的 DataStream,需要调用函数回归。

  1. 虚拟化时刻:从真实流到虚拟流
    KeyedStream<T,K> keyedStream = …; // 半虚拟流
    WindowedStream<T,K,W> windowedStream = keyedStream.window(assigner); // 完全虚拟流

  2. 回归时刻:从虚拟流回到真实流
    windowedStream.sum()
    return input.transform(opName, resultType, operator); // 回到DataStream标准流程

一、window() 方法的特殊性发现

1.1 只有 KeyedStream 才有 window 方法

//  DataStream 上没有 window 方法
DataStream<String> stream = ...;
// stream.window(assigner); // 编译错误!//  只有 KeyedStream 才有 window 方法
KeyedStream<String, String> keyedStream = stream.keyBy(...);
WindowedStream<String, String, TimeWindow> windowedStream = keyedStream.window(assigner);

为什么这样设计?

  • 窗口操作需要基于 Key 进行分组
  • 每个 Key 都有独立的窗口状态
  • 保证相同 Key 的数据进入同一个窗口实例

1.2 KeyedStream 的特殊 API 设计

public class KeyedStream<T, KEY> extends DataStream<T> {// 继承 DataStream 的所有方法:map, filter, flatMap...// KeyedStream 特有的窗口 APIpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner);public WindowedStream<T, KEY, GlobalWindow> countWindow(long size);// KeyedStream 特有的聚合 APIpublic SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function);public SingleOutputStreamOperator<T> sum(int positionToSum);public SingleOutputStreamOperator<T> max(int positionToMax);// ... 其他聚合操作
}

设计理念

  • 继承性:保留 DataStream 的所有基础能力
  • 扩展性:增加基于 Key 的特殊操作
  • 状态性:支持有状态的聚合操作

二、WindowedStream 的"虚拟"本质

2.1 WindowedStream 的创建过程

// KeyedStream.java
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream<>(this, assigner);  // 仅仅是创建对象
}

关键发现:window() 方法没有创建任何 Transformation!

2.2 WindowedStream 的内部结构

public class WindowedStream<T, K, W extends Window> {// 仅有两个成员变量private final KeyedStream<T, K> input;           // 上游流的引用private final WindowOperatorBuilder<T, K, W> builder;  // 算子构建器// 注意:没有继承 DataStream!
}

2.3 WindowedStream 构造函数解析

public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;  // 保存上游流引用// 创建窗口算子构建器,用于构建窗口操作的核心组件// WindowOperatorBuilder是构建者模式的实现,负责组装窗口操作所需的各种组件this.builder = new WindowOperatorBuilder<>(// 窗口分配器:决定数据元素被分配到哪个窗口// 例如:TumblingEventTimeWindows、SlidingEventTimeWindows等windowAssigner,// 窗口触发器:决定何时触发窗口计算和输出结果// 每种窗口分配器都有其默认的触发器策略// 例如:EventTimeTrigger用于事件时间窗口,ProcessingTimeTrigger用于处理时间窗口windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),// 执行配置:包含序列化器、并行度等运行时配置信息input.getExecutionConfig(),// 输入数据类型信息:用于序列化和反序列化输入数据input.getType(),// Key选择器:从输入数据中提取分组键,确保相同key的数据进入同一个窗口实例input.getKeySelector(),// Key类型信息:用于序列化和反序列化分组键input.getKeyType());
}

重要理解

  • 构造函数只是组装配置信息,没有创建算子
  • 比 KeyedStream 更"虚",KeyedStream 好歹有个 PartitionTransformation
  • WindowedStream 什么 Transformation 都没有

2.4 WindowedStream 的"虚拟"特性

流类型虚拟化程度特性描述
DataStream🟢 真实流✅ 有 Transformation
✅ 支持链式调用
✅ 可直接执行
KeyedStream🟡 半虚拟流✅ 有 PartitionTransformation
✅ 支持链式调用
✅ 支持窗口API
⚠️ 无实际算子
WindowedStream🔴 完全虚拟流❌ 无 Transformation
❌ 断开链式调用
✅ 只支持窗口聚合API
⚠️ 纯过渡器

WindowedStream 的特殊性

  1. 不继承 DataStream - 彻底断开链式调用
  2. 纯 API 过渡器 - 只是工具类,不是真正的流
  3. 强制聚合 - 必须调用聚合操作才能回到正常流
  4. 临时状态 - 无法直接使用,必须转换

WindowedStream 的特殊性

  1. 不继承 DataStream - 彻底断开链式调用
  2. 纯 API 过渡器 - 只是工具类,不是真正的流
  3. 强制聚合 - 必须调用聚合操作才能回到正常流
  4. 临时状态 - 无法直接使用,必须转换

三、sum() 方法的完整解析

3.1 sum() 方法的调用链

// WindowedStream.java - 入口方法
public SingleOutputStreamOperator<T> sum(int positionToSum) {// 创建内置的求和聚合器return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}// aggregate 方法 - 中转
private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {return reduce(aggregator);  // 转发给 reduce
}

关键理解

  • sum() 只是一个便利方法
  • 内部使用 Flink 预定义的 SumAggregator
  • 最终还是调用 reduce() 方法

3.2 SumAggregator 的本质

// SumAggregator 的继承关系
public class SumAggregator<T> extends AggregationFunction<T> implements ReduceFunction<T> {private final int positionToSum;  // 要求和的字段位置// 实现具体的求和逻辑
}

重要发现

  • SumAggregator 就是一个 ReduceFunction
  • 与用户自定义的 MapFunction 地位完全相同
  • Flink 内部预写好的函数,用户也可以自己实现

3.3 reduce() 方法的三层重载

// 第一层:只有 ReduceFunction(我们的入口)
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {function = input.getExecutionEnvironment().clean(function);  // 清理函数return reduce(function, new PassThroughWindowFunction<>());  // 添加默认 WindowFunction
}// 第二层:ReduceFunction + WindowFunction
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function) {// 推断输出类型TypeInformation<R> resultType = getWindowFunctionReturnType(function, inputType);return reduce(reduceFunction, function, resultType);  // 继续传递
}// 第三层:完整参数(最终实现)
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function,TypeInformation<R> resultType) {// 1. 清理函数(序列化检查)function = input.getExecutionEnvironment().clean(function);reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);// 2. 生成算子名称和描述final String opName = builder.generateOperatorName();final String opDescription = builder.generateOperatorDescription(reduceFunction, function);// 3. 通过 builder 根据function 创建WindowOperatorOneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);// 4. 根据Operator 创建 OperatorFactory -> transformation -> DataStreamreturn input.transform(opName, resultType, operator).setDescription(opDescription);
}

重载链的设计目的

  • 逐步补充参数:从简单到复杂
  • 提供默认值:PassThroughWindowFunction 作为默认窗口函数
  • 类型推断:自动推断输出类型
  • 函数清理:确保函数可序列化

3.4 PassThroughWindowFunction 的巧妙设计

// 第一层 reduce 方法中的关键一行
return reduce(function, new PassThroughWindowFunction<>());

PassThroughWindowFunction 的作用

// PassThroughWindowFunction 的简化实现
public class PassThroughWindowFunction<T, K, W extends Window>implements WindowFunction<T, T, K, W> {@Overridepublic void apply(K key, W window, Iterable<T> input, Collector<T> out) {// 直接透传,不做任何处理for (T element : input) {out.collect(element);}}
}

为什么需要 PassThroughWindowFunction?

  • 接口统一:WindowOperator 需要 ReduceFunction + WindowFunction 两个函数
  • 透明传递:用户只想要聚合结果,不需要额外处理
  • 适配器模式:将单一的 ReduceFunction 适配为完整的窗口处理流程
用户调用sum
只有ReduceFunction
SumAggregator
自动添加
PassThroughWindowFunction
WindowOperator需要的
完整函数对

五、回到 DataStream 的标准流程

5.1 关键的回归时刻

// WindowedStream 的最后一步 - 回到正轨!
return input.transform(opName, resultType, operator);

这一行代码的重要性

  • inputKeyedStream(继承自 DataStream
  • 调用的是 DataStream.transform() 方法
  • WindowedStream 完成使命,回到标准流程

5.2 transform() 方法的标准处理

// DataStream.java - 标准的 transform 方法
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 包装算子为工厂return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

5.3 doTransform() 的核心逻辑

protected <R> SingleOutputStreamOperator<R> doTransform(...) {// 1. 创建物理 TransformationOneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,    // 上游:PartitionTransformation (keyBy产生的)operatorName,          // "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, SumAggregator, PassThroughWindowFunction)"operatorFactory,       // SimpleOperatorFactory(WindowOperator)outTypeInfo,          // 输出类型信息environment.getParallelism(),  // 并行度false);               // 不是并行度敏感的// 2. 创建新的 DataStreamSingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator<>(environment, resultTransform);// 3. 添加到执行环境 - 重要!getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

关键步骤解析

  1. 创建物理 Transformation:包含真正的算子
  2. 构建新的 DataStream:恢复正常的流
  3. 注册到环境:只有物理 Transformation 才会被注册

六、调用时序图

在这里插入图片描述

导航链接

上节链接:Flink Stream API 源码走读 - keyBy

下节预告:Flink Stream API 源码走读 - print

http://www.lryc.cn/news/622850.html

相关文章:

  • (第十七期)HTML图像标签详解:从入门到精通
  • 【完整源码+数据集+部署教程】高尔夫球追踪与识别系统源码和数据集:改进yolo11-LAWDS
  • 【基础-判断】可以通过ohpm uninstall 指令下载指定的三方库
  • 力扣(接雨水)——基于最高柱分割的双指针
  • 【开发技巧】VS2022+QT5+OpenCV4.10开发环境搭建QT Creator
  • 肖臻《区块链技术与应用》第23-26讲 - The DAO事件、BEC事件、反思和总结
  • Qt 关于QString和std::string数据截断的问题- 遇到\0或者0x00如何处理?
  • ★CentOS:MySQL数据备份
  • 三天速通 Vue+Flask+SQLite 项目+阿里云轻量应用级服务器【宝塔面板】②
  • 数学建模Topsis法笔记
  • TOGAF八步一法笔记2
  • 【DL学习笔记】常用数据集总结
  • OpenShift 4.19安装中的变化
  • 民法学学习笔记(个人向) Part.5
  • Protues使用说明及Protues与Keil联合仿真实现点亮小灯和流水灯
  • 【运维心得】三步更换HP笔记本电脑外壳
  • C++基础——内存管理
  • C++实战
  • 《深度解构:构建浏览器端Redis控制台的WebSocket协议核心技术》
  • Linux -- 文件【下】
  • 基于Uni-app+vue3实现微信小程序地图固定中心点范围内拖拽选择位置功能(分步骤详解)
  • 谷歌手机刷机和面具ROOT保姆级别教程
  • ubuntu远程桌面很卡怎么解决?
  • 【3D重建技术】如何基于遥感图像和DEM等数据进行城市级高精度三维重建?
  • 数据结构 实现循环队列的三种方法
  • 开源数据发现平台:Amundsen Frontend Service React 配置 Flask 配置 Superset 预览集成
  • Vue 3与React内置组件全对比
  • RK3588芯片在AR眼镜中的核心技术优势是什么?
  • MySQL的三大范式:
  • AI驱动的性能测试:如何用机器学习预测系统瓶颈?