当前位置: 首页 > news >正文

Flink Stream API 源码走读 - keyBy

概述

本文深入分析了 Flink 中 keyBy() 方法的源码实现,重点讲解了 keyBy 与之前 map、flatMap 的不同之处,以及虚拟 Transformation 的概念。

keyBy 方法源码分析

1. keyBy 方法调用入口

// 用户调用
DataStream<String> flatMapped = ...;
KeyedStream<String, String> keyed = flatMapped.keyBy(value -> value.split(" ")[0]);
// DataStream.java 中的实现
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {Preconditions.checkNotNull(key);return new KeyedStream<>(this, clean(key));
}

关键点:

  • 直接创建 KeyedStream 对象
  • 传入当前流 this 和清理后的 KeySelector
  • 没有像 map/flatMap 那样的复杂转换过程

2. KeyedStream 的构造过程

keyBy(KeySelector)
new KeyedStream(this, clean(key))
KeyedStream构造1: 抽取Key类型
KeyedStream构造2: 创建PartitionTransformation
KeyedStream构造3: 设置成员变量
构造方法重载链
// 第一层构造:抽取Key类型
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {this(dataStream,keySelector,TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
}// 第二层构造:创建PartitionTransformation
public KeyedStream(DataStream<T> dataStream,KeySelector<T, KEY> keySelector,TypeInformation<KEY> keyType) {this(dataStream,new PartitionTransformation<>(dataStream.getTransformation(),new KeyGroupStreamPartitioner<>(keySelector,StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),keySelector,keyType);
}// 第三层构造:最终构造
KeyedStream(DataStream<T> stream,PartitionTransformation<T> partitionTransformation,KeySelector<T, KEY> keySelector,TypeInformation<KEY> keyType) {super(stream.getExecutionEnvironment(), partitionTransformation);this.keySelector = clean(keySelector);this.keyType = validateKeyType(keyType);
}

3. KeyedStream 的核心组成

public class KeyedStream<T, KEY> extends DataStream<T> {// Key选择器 - 从数据中提取Key的函数private final KeySelector<T, KEY> keySelector;// Key类型信息private final TypeInformation<KEY> keyType;// 继承自DataStream的成员:// - StreamExecutionEnvironment environment// - Transformation<T> transformation (这里是PartitionTransformation)
}

KeyedStream 的特殊之处:

  1. 继承自 DataStream - 可以继续调用 DataStream 的所有 API
  2. 持有 KeySelector - 保存用户提供的 Key 提取逻辑
  3. 包含 PartitionTransformation - 虚拟的分区转换

虚拟 Transformation vs 物理 Transformation

1. Transformation 的分类

Transformation
PhysicalTransformation
物理转换
VirtualTransformation
虚拟转换
LegacySourceTransformation
包含算子
OneInputTransformation
包含算子
TwoInputTransformation
包含算子
PartitionTransformation
只包含分区器
UnionTransformation
联合转换

2. 物理 Transformation 特征

// 物理Transformation示例:OneInputTransformation
public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> {private final Transformation<IN> input;private final StreamOperatorFactory<OUT> operatorFactory;  // 包含算子工厂// ...
}// LegacySourceTransformation
public class LegacySourceTransformation<T> extends PhysicalTransformation<T> {private final StreamOperatorFactory<T> operatorFactory;  // 包含算子工厂// ...
}

物理 Transformation 特点:

  • 继承自 PhysicalTransformation
  • 包含 StreamOperatorFactory(算子工厂)
  • 包含真正的计算逻辑
  • 会生成实际的运行时任务

3. 虚拟 Transformation 特征

// 虚拟Transformation示例:PartitionTransformation
public class PartitionTransformation<T> extends Transformation<T> {private final Transformation<T> input;           // 上游Transformationprivate final StreamPartitioner<T> partitioner; // 分区器private final StreamExchangeMode exchangeMode;   // 交换模式// 注意:没有 StreamOperatorFactory!
}

虚拟 Transformation 特点:

  • 直接继承自 Transformation(不是 PhysicalTransformation
  • 不包含算子工厂
  • 不包含计算逻辑
  • 只指定数据分发规则

4. keyBy 为什么是虚拟转换?

// keyBy 的本质
flatMapped.keyBy(keySelector)

分析:

  • keyBy 本身不进行任何数据计算
  • 只是告诉上游算子如何将数据发送给下游
  • 实际的分区逻辑在数据传输时执行
  • 不需要独立的运行时任务

PartitionTransformation 详解

1. PartitionTransformation 的构造

new PartitionTransformation<>(dataStream.getTransformation(),                    // 上游transformationnew KeyGroupStreamPartitioner<>(                   // 分区器keySelector,StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)
)

2. KeyGroupStreamPartitioner 分区逻辑

public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> {private final KeySelector<T, K> keySelector;private int maxParallelism;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {// 1. 使用KeySelector提取Keykey = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}// 2. 根据Key计算目标并行度return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}
}

分区算法:

  1. 使用 KeySelector 从数据中提取 Key
  2. 调用 KeyGroupRangeAssignment.assignKeyToParallelOperator() 计算目标分区
  3. 涉及 KeyGroup 概念和哈希算法

Transformation 链条对比

1. Source Transformation(特殊情况)

LegacySourceTransformation
没有上游
包含SocketTextStreamFunction
的StreamSource算子

特点:

  • 没有上游 Transformation(input 为空)
  • 作为整个链条的起点
  • 包含数据生成逻辑

2. 普通算子 Transformation

上游Transformation
OneInputTransformation
包含算子工厂
下游Transformation

特点:

  • 有明确的上游 Transformation
  • 包含算子工厂和计算逻辑
  • 形成计算链条

3. keyBy Transformation

上游Transformation
PartitionTransformation
只包含分区器
下游Transformation

特点:

  • 有上游 Transformation
  • 不包含算子,只包含分区器
  • 指定数据分发规则

完整的调用链对比

map/flatMap 调用链

DataStream.map
抽取类型信息
创建StreamMap算子
包装成SimpleOperatorFactory
创建OneInputTransformation
添加到Environment
返回SingleOutputStreamOperator

keyBy 调用链

DataStream.keyBy
抽取Key类型信息
创建KeyGroupStreamPartitioner
创建PartitionTransformation
返回KeyedStream

KeyedStream 的继承关系

public class KeyedStream<T, KEY> extends DataStream<T>
graph TBA[DataStream&lt;T&gt;] --> B[KeyedStream&lt;T, KEY&gt;]A --> C[包含所有流式API<br/>map, flatMap, filter等]B --> D[额外支持状态相关API<br/>reduce, aggregate等]

继承的好处:

  • 可以继续调用 DataStream 的所有 API
  • 支持链式调用:keyedStream.map(...).filter(...)
  • 同时获得状态管理能力

源码走读的小建议

1. 方法调用栈深度问题

keyBy() → KeyedStream构造1 → KeyedStream构造2 → KeyedStream构造3 → PartitionTransformation构造→ KeyGroupStreamPartitioner构造

建议:

  • 看源码时容易"压栈"过深,前面的忘记了
  • 需要多次练习才能形成经验
  • 重点关注核心概念,不要陷入细节

2. 概念理解的重要性

核心概念回顾:

  1. Function - 用户逻辑
  2. Operator - 算子(包装Function)
  3. Transformation - 转换操作(包装Operator或分区器)
  4. DataStream - 用户API(包装Transformation)

总结

keyBy 的特殊性

  1. 不创建算子 - 只创建分区器
  2. 虚拟转换 - 不包含计算逻辑
  3. 分区指导 - 指定数据如何分发
  4. 状态准备 - 为有状态计算做准备

设计理念

  • 职责分离 - 分区逻辑与计算逻辑分离
  • 延迟执行 - 分区在数据传输时执行
  • 链式调用 - 保持API的流畅性
  • 类型安全 - 通过泛型保证Key类型安全

下节预告

Flink Stream API 源码走读 - window和sum


重要理解

  • keyBy 是一个"虚拟"操作,不产生实际的计算任务
  • 它的作用是指导数据分区,为下游的有状态计算做准备
  • KeyedStream 继承自 DataStream,可以继续进行各种转换操作
http://www.lryc.cn/news/621490.html

相关文章:

  • 转换一个python项目到moonbit,碰到报错输出:编译器对workflow.mbt文件中的类方法要求不一致的类型注解,导致无法正常编译
  • Vue响应式系统在超大型应用中的性能瓶颈
  • 中年海尔,是时候押注新方向了
  • 训练大模型的前提:数据治理工程:从原始数据到高质量语料的系统化治理实践
  • 抽奖程序web程序
  • 小迪安全v2023学习笔记(六十二讲)—— PHP框架反序列化
  • 实战 AI8051U 音视频播放:USART-SPI→DMA-P2P→SPI+I2S 例程详解
  • Redis 实用型限流与延时队列:从 Lua 固定/滑动窗口到 Streams 消费组(含脚本与压测)
  • 大华相机RTSP无法正常拉流问题分析与解决
  • (Arxiv-2025)Stand-In:一种轻量化、即插即用的身份控制方法用于视频生成
  • openwrt增加自定义网页
  • 基于asp.net#C##VUE框架的独居老人物资配送系统的设计与实现#sql server#visual studio
  • 国内多光谱相机做得好的厂家有哪些?-多光谱相机品牌厂家
  • 8月4日实训考察:重庆五一职院走进成都国际影像产业园
  • Flink面试题及详细答案100道(1-20)- 基础概念与架构
  • 基于.net、C#、asp.net、vs的保护大自然网站的设计与实现
  • Vue3中的ref与reactive全面解析:如何正确选择响应式声明方式
  • java 策略模式 demo
  • 基于微信小程序的家教服务平台的设计与实现/基于asp.net/c#的家教服务平台/基于asp.net/c#的家教管理系统
  • 「iOS」————APP启动优化
  • 什么是接口?PHP如何使用 SessionHandlerInterface 接口实现Session自定义会话数据存储
  • Spark 运行流程核心组件(二)任务调度
  • Python 基础语法笔记.2
  • Dijkstra与Floyd求最短路算法简介
  • zabbix部署问题后常见问题
  • sqli-labs通关笔记-第50关 GET数值型order by堆叠注入(手工注入+脚本注入两种方法)
  • StringBoot-SSE和WebFlux方式消息实时推送-默认单向-可增加交互接口
  • qt项目中解决关闭弹窗后执行主界面的信号槽时闪退问题
  • c++中的Lambda表达式详解
  • ATAM:基于场景的软件架构权衡分析法