Flink Stream API 源码走读 - keyBy
概述
本文深入分析了 Flink 中 keyBy()
方法的源码实现,重点讲解了 keyBy 与之前 map、flatMap 的不同之处,以及虚拟 Transformation 的概念。
keyBy 方法源码分析
1. keyBy 方法调用入口
// 用户调用
DataStream<String> flatMapped = ...;
KeyedStream<String, String> keyed = flatMapped.keyBy(value -> value.split(" ")[0]);
// DataStream.java 中的实现
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {Preconditions.checkNotNull(key);return new KeyedStream<>(this, clean(key));
}
关键点:
- 直接创建
KeyedStream
对象 - 传入当前流
this
和清理后的KeySelector
- 没有像 map/flatMap 那样的复杂转换过程
2. KeyedStream 的构造过程
构造方法重载链
// 第一层构造:抽取Key类型
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {this(dataStream,keySelector,TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
}// 第二层构造:创建PartitionTransformation
public KeyedStream(DataStream<T> dataStream,KeySelector<T, KEY> keySelector,TypeInformation<KEY> keyType) {this(dataStream,new PartitionTransformation<>(dataStream.getTransformation(),new KeyGroupStreamPartitioner<>(keySelector,StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),keySelector,keyType);
}// 第三层构造:最终构造
KeyedStream(DataStream<T> stream,PartitionTransformation<T> partitionTransformation,KeySelector<T, KEY> keySelector,TypeInformation<KEY> keyType) {super(stream.getExecutionEnvironment(), partitionTransformation);this.keySelector = clean(keySelector);this.keyType = validateKeyType(keyType);
}
3. KeyedStream 的核心组成
public class KeyedStream<T, KEY> extends DataStream<T> {// Key选择器 - 从数据中提取Key的函数private final KeySelector<T, KEY> keySelector;// Key类型信息private final TypeInformation<KEY> keyType;// 继承自DataStream的成员:// - StreamExecutionEnvironment environment// - Transformation<T> transformation (这里是PartitionTransformation)
}
KeyedStream 的特殊之处:
- 继承自 DataStream - 可以继续调用 DataStream 的所有 API
- 持有 KeySelector - 保存用户提供的 Key 提取逻辑
- 包含 PartitionTransformation - 虚拟的分区转换
虚拟 Transformation vs 物理 Transformation
1. Transformation 的分类
2. 物理 Transformation 特征
// 物理Transformation示例:OneInputTransformation
public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> {private final Transformation<IN> input;private final StreamOperatorFactory<OUT> operatorFactory; // 包含算子工厂// ...
}// LegacySourceTransformation
public class LegacySourceTransformation<T> extends PhysicalTransformation<T> {private final StreamOperatorFactory<T> operatorFactory; // 包含算子工厂// ...
}
物理 Transformation 特点:
- 继承自
PhysicalTransformation
- 包含
StreamOperatorFactory
(算子工厂) - 包含真正的计算逻辑
- 会生成实际的运行时任务
3. 虚拟 Transformation 特征
// 虚拟Transformation示例:PartitionTransformation
public class PartitionTransformation<T> extends Transformation<T> {private final Transformation<T> input; // 上游Transformationprivate final StreamPartitioner<T> partitioner; // 分区器private final StreamExchangeMode exchangeMode; // 交换模式// 注意:没有 StreamOperatorFactory!
}
虚拟 Transformation 特点:
- 直接继承自
Transformation
(不是PhysicalTransformation
) - 不包含算子工厂
- 不包含计算逻辑
- 只指定数据分发规则
4. keyBy 为什么是虚拟转换?
// keyBy 的本质
flatMapped.keyBy(keySelector)
分析:
- keyBy 本身不进行任何数据计算
- 只是告诉上游算子如何将数据发送给下游
- 实际的分区逻辑在数据传输时执行
- 不需要独立的运行时任务
PartitionTransformation 详解
1. PartitionTransformation 的构造
new PartitionTransformation<>(dataStream.getTransformation(), // 上游transformationnew KeyGroupStreamPartitioner<>( // 分区器keySelector,StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)
)
2. KeyGroupStreamPartitioner 分区逻辑
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> {private final KeySelector<T, K> keySelector;private int maxParallelism;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {// 1. 使用KeySelector提取Keykey = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}// 2. 根据Key计算目标并行度return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}
}
分区算法:
- 使用
KeySelector
从数据中提取 Key - 调用
KeyGroupRangeAssignment.assignKeyToParallelOperator()
计算目标分区 - 涉及 KeyGroup 概念和哈希算法
Transformation 链条对比
1. Source Transformation(特殊情况)
特点:
- 没有上游 Transformation(
input
为空) - 作为整个链条的起点
- 包含数据生成逻辑
2. 普通算子 Transformation
特点:
- 有明确的上游 Transformation
- 包含算子工厂和计算逻辑
- 形成计算链条
3. keyBy Transformation
特点:
- 有上游 Transformation
- 不包含算子,只包含分区器
- 指定数据分发规则
完整的调用链对比
map/flatMap 调用链
keyBy 调用链
KeyedStream 的继承关系
public class KeyedStream<T, KEY> extends DataStream<T>
graph TBA[DataStream<T>] --> B[KeyedStream<T, KEY>]A --> C[包含所有流式API<br/>map, flatMap, filter等]B --> D[额外支持状态相关API<br/>reduce, aggregate等]
继承的好处:
- 可以继续调用 DataStream 的所有 API
- 支持链式调用:
keyedStream.map(...).filter(...)
- 同时获得状态管理能力
源码走读的小建议
1. 方法调用栈深度问题
keyBy() → KeyedStream构造1 → KeyedStream构造2 → KeyedStream构造3 → PartitionTransformation构造→ KeyGroupStreamPartitioner构造
建议:
- 看源码时容易"压栈"过深,前面的忘记了
- 需要多次练习才能形成经验
- 重点关注核心概念,不要陷入细节
2. 概念理解的重要性
核心概念回顾:
- Function - 用户逻辑
- Operator - 算子(包装Function)
- Transformation - 转换操作(包装Operator或分区器)
- DataStream - 用户API(包装Transformation)
总结
keyBy 的特殊性
- 不创建算子 - 只创建分区器
- 虚拟转换 - 不包含计算逻辑
- 分区指导 - 指定数据如何分发
- 状态准备 - 为有状态计算做准备
设计理念
- 职责分离 - 分区逻辑与计算逻辑分离
- 延迟执行 - 分区在数据传输时执行
- 链式调用 - 保持API的流畅性
- 类型安全 - 通过泛型保证Key类型安全
下节预告
Flink Stream API 源码走读 - window和sum
重要理解:
- keyBy 是一个"虚拟"操作,不产生实际的计算任务
- 它的作用是指导数据分区,为下游的有状态计算做准备
- KeyedStream 继承自 DataStream,可以继续进行各种转换操作