2.3 Flink的核心概念解析
Stream & Transformation (数据流与转换)
想象一下,你正在经营一个大型的瓶装果汁工厂。源源不断从果园送来的水果,就是我们的 数据流 (Stream)。它有以下几个特点:
- 无界 (Unbounded):水果会持续不断地被送来,没有尽头。这就像 Flink 处理的实时数据流,例如用户的点击行为、服务器的日志等,是连续不断的。
- 有序 (Ordered):虽然水果是一批批送来的,但在每一批内部,它们可以被认为是有先后顺序的。数据流中的事件也一样,通常会携带一个时间戳来标记其发生的顺序。
- 不可变 (Immutable):一旦某个水果被送进工厂,你就不能改变这个水果本身了(比如把苹果变成橘子)。你只能对它进行加工,产出新的东西。在 Flink 中,数据流中的元素也是不可变的,我们只能通过计算生成新的数据流。
那么,转换 (Transformation) 是什么呢?它就是你工厂里的各种加工步骤。
- 清洗:对应
map
操作,把每个进来的水果清洗干净,让它变成“干净的水果”。这个过程是一对一的转换。 - 榨汁:对应
flatMap
操作,一个苹果可以榨出多杯果汁(或者因为是坏苹果一杯也榨不出来)。这个过程是一对多或一对零的转换。 - 过滤:对应
filter
操作,我们只挑选红富士苹果进行下一步加工,把其他品种的苹果过滤掉。 - 混合:对应
keyBy
+reduce
或sum
等聚合操作。我们将不同果农送来的同一种类水果(比如草莓)汇集到一起(keyBy
),然后计算总重量(sum
)或者将它们混合榨汁(reduce
)。
Transformation
就是定义了如何将一个或多个数据流,加工成新的数据流的一系列操作蓝图。它只是一个“计划”,并不会立即执行,只有当整个流程定义好并提交后,Flink 才会真正开始处理数据。
代码示例:
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建一个数据流 (Stream)
DataStream<String> stream = env.fromElements("apple", "banana", "apple", "orange");// 3. 定义一系列转换 (Transformations)
DataStream<String> processedStream = stream.map(fruit -> "washed-" + fruit) // 清洗: washed-apple, washed-banana....filter(washedFruit -> !washedFruit.equals("washed-banana")); // 过滤掉香蕉// 4. 打印结果 (Sink)
processedStream.print();// 5. 执行作业
env.execute("Fruit Processing Job");
Source & Sink (数据源与数据汇)
Source
和 Sink
非常好理解,它们分别是你果汁工厂的“入口”和“出口”。
-
Source (数据源):就是接收水果的地方。它可以是来自特定果园的卡车(对应从 Kafka 读取消息),也可以是一个固定的水果篮(对应从文件读取数据),甚至可以是员工手动一个个放上去的(对应从集合创建数据流)。Flink 内置了丰富的 Source 连接器,如
Kafka
、RabbitMQ
、HDFS
等,它规定了数据以何种形式流入你的 Flink 程序。 -
Sink (数据汇):就是成品果汁最终要去的地方。你可以将果汁装瓶卖给超市(对应写入到另一个 Kafka 主题),也可以直接倒入大缸存储起来(对应写入到数据库或文件系统),或者直接给员工品尝(对应打印到控制台)。
Sink
定义了数据处理完毕后的最终去向。
一个完整的 Flink 作业,就是一个从 Source
开始,经过一系列 Transformation
,最后由 Sink
结束的完整数据处理流水线。
代码示例 (续上):
// Source: 从一个集合中创建数据流
DataStream<String> stream = env.fromElements("apple", "banana", "apple", "orange");// ... transformations ...// Sink: 打印到控制台
processedStream.print();/*
// 如果要写入到 Kafka,Sink 会是这样:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("processed-fruits-topic",new SimpleStringSchema(),properties);processedStream.addSink(kafkaSink);
*/
Operator & Operator Chain (算子与算子链)
算子 (Operator) 是 Transformation
的具体实现。如果说 map
、filter
是加工“动作”的名称,那么 Operator
就是执行这些动作的“机器”。在 Flink 中,我们定义的每一个 map()
, filter()
, keyBy()
等操作,在运行时都会被翻译成一个具体的 Operator
实例来执行。
现在,我们来升级一下工厂的效率。原本,我们有三台独立的机器:
- 清洗机 (
map
operator) - 过滤机 (
filter
operator) - 榨汁机 (
flatMap
operator)
水果在清洗机洗完后,掉进传送带,送到过滤机;过滤后,再由传送带送到榨汁机。这个过程中,传送带上的交接传递(数据的序列化、反序列化、网络传输)是有效率损耗的。
有没有办法优化呢?当然有!我们可以把“清洗”和“过滤”这两台功能简单、并且都是一对一处理的机器,合并成一台“清洗过滤一体机”。水果进去,直接就完成了清洗和过滤两个步骤,然后才出来。
这个“清洗过滤一体机”就是 算子链 (Operator Chain)。
Flink 会出于优化的目的,将满足特定条件的多个 Operator
“链接”在一起,打包成一个任务单元(Task)来执行。这样可以极大地减少数据在不同任务之间传输的开销,提升性能。
形成算子链的条件:
- 一对一 (One-to-one) 的数据关系:比如
map
、filter
、flatMap
(在不改变并行度的情况下)这些操作,上一个算子处理完一条数据,传给下一个算子的也是一条数据。 - 并行度相同:两个算子的并行度必须一样。
- 默认的
forward
策略:算子之间的数据分区策略是forward
(直接转发给下游的同一个并行实例)。
像 keyBy
、rebalance
、shuffle
这种会引起数据重新分发的操作,就像一个“分拣中心”,会打断算子链。因为数据需要被发送到下游不同的并行实例中去,无法形成一个固定的“一体机”。
Parallelism (并行度) & Slot (任务槽)
这两个概念是 Flink 实现高吞吐量的关键,我们用一个收费站的例子来理解。
-
并行度 (Parallelism):指的是一个
Operator
(或者说一个Task
) 被分成了多少个并行的实例来执行。它就像一个收费站有多少个收费通道。如果一个Operator
的并行度是 4,就意味着 Flink 会启动 4 个并行的实例,同时处理这个算子的任务。你可以为整个作业设置一个全局并行度,也可以为单个Operator
单独设置。 -
任务槽 (Slot):
Slot
是 TaskManager(还记得吗?Flink 的工作节点)里的计算资源单元。它就像是收费站里的一条完整的车道,注意,是一条车道,而不是一个收费亭。一个 TaskManager 可以有多个Slot
,表示它能同时执行多个任务。
关系与区别:
- 并行度是“逻辑”概念,是你希望一个任务用多少个线程去跑。
- Slot 是“物理”资源,是 TaskManager 能提供多少个“坑位”来运行这些线程。
假设你的程序 Source -> map -> keyBy -> Sink
,全局并行度设置为 2。那么 Source
有2个实例,map
有2个实例,keyBy
有2个实例,Sink
也有2个实例。总共有 8 个 Task
。
现在,假设你有一个 TaskManager,它配置了 3 个 Slot
。Flink 的 JobManager 就会把这 8 个 Task
分配到这 3 个 Slot
中去执行。
一个 Slot
可以运行来自不同 Operator
的 Task
(前提是这些 Task
属于同一个作业)。例如,Slot 1
里可以同时运行 Source-实例1
和 map-实例1
(回忆一下算子链,它们可能被链接在一起作为一个 Task
运行)。这种机制叫 Slot 共享,它允许 Flink 将多个逻辑上独立的 Task
部署在一个物理资源单元里,提高了资源的利用率。
上图中,Source
和 map
链接在了一起,keyBy
打断了链接,reduce
和 sink
链接在了一起。整个作业并行度为2。这些被链接起来的 Task
组,会被分配到不同的 Slot
中执行。
代码示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置全局并行度为 4
env.setParallelism(4);DataStream<String> stream = env.fromElements(...).map(...) // 这个 map 算子的并行度是 4.filter(...) // 这个 filter 算子的并行度也是 4.name("MyFilter").setParallelism(2); // 单独设置这个算子的并行度为 2stream.print();
Event Time, Processing Time, Ingestion Time (时间语义)
在实时计算中,“时间”是一个至关重要的概念。想象一下,你在看一场全球直播的体育比赛,不同地区的观众因为网络延迟,看到进球画面的时间是不一样的。
-
Processing Time (处理时间):这是最简单的时间。它指的是计算节点(TaskManager)的系统时钟时间。就像你在北京时间晚上 8:00:10 看到进球画面,那么这条“进球”数据被处理的时间就是 8:00:10。它不关心这个球到底是什么时候踢进去的。
- 优点:实现简单,延迟最低。
- 缺点:结果不确定。如果因为网络抖动或系统负载,同一批数据被处理的顺序发生了变化,那么计算结果(比如计算每分钟进球数)就可能完全不同。
-
Event Time (事件时间):它指的是事件本身发生的时间。无论你什么时候看到进球,进球这个事件发生的时间是固定的,比如是世界标准时间下午 3:45:01。这个时间戳通常会作为数据的一部分被记录下来。
- 优点:结果最准确、最可预测。无论数据何时到达、处理顺序如何,基于事件时间的计算结果都是唯一的、确定性的。这对于需要精确业务逻辑的场景(如金融交易、用户行为分析)至关重要。
- 缺点:需要处理数据乱序和延迟的问题,实现相对复杂。这就引出了我们下一个概念——Watermark。
-
Ingestion Time (摄入时间):这是一个折中方案。它指的是数据进入 Flink Source Operator 的时间。数据一进入 Flink 系统,就被立即打上一个时间戳。它解决了 Processing Time 的部分不确定性,因为时间戳是在源头统一赋予的,但它依然无法解决数据从产生地到 Flink Source 之间的延迟问题。
一句话总结:
- Processing Time:我的电脑现在几点?
- Event Time:这件事到底是什么时候发生的?
- Ingestion Time:你是什么时候到我这儿(Flink)的?
在 Flink 1.12 版本之后,默认的时间语义就是 Event Time,可见其重要性。
代码示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置时间语义为 Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<MyEvent> stream = env.addSource(...)// 抽取事件时间戳,并生成 Watermark.assignTimestampsAndWatermarks(...);
Watermark (水印) 原理
Watermark 是 Flink 中处理事件时间 (Event Time) 语义时,解决数据乱序 (Out-of-Order) 问题的核心机制。
让我们回到果汁工厂的比喻。假设你在处理一批有时效性的水果,比如草莓,你承诺“处理所有上午10点前采摘的草莓”。但因为运输问题,有些9点58分采摘的草莓,可能在10点05分才送到。你怎么办?
- 方案一:死等。 一直等下去,直到你确信所有10点前的草莓都到了。但这不现实,你可能要等到天荒地老。
- 方案二:到点就关门。 10点钟一到,立刻停止接收,开始处理。这样做的风险是,那些迟到的9点58分的草莓就被漏掉了,导致计算结果不准确。
Watermark 就是一个更聪明的方案。 它像一个动态的“时钟”,这个时钟走得比最快的水果到达的时间要慢一些。
工厂管理者(JobManager)会广播一个“水位线 (Watermark)”,比如 Watermark(09:55)
。这个信号的含义是:“我断定(或有很大概率相信),所有时间戳在 09:55 之前的水果都已经到达了。”
当 Flink 中的窗口操作(比如统计每5分钟的水果数量)收到这个 Watermark(09:55)
时,如果它有一个 09:50 - 09:55
的窗口正在等待数据,它就会认为这个窗口的数据已经齐全了,可以放心地关闭窗口,进行计算并输出结果了。
Watermark 如何生成?
它通常是基于当前已到达数据的最大事件时间,减去一个你预估的最大乱序延迟时间。
Watermark(t) = MaxEventTime - MaxOutOfOrderness
例如,你收到的最新的水果采摘时间是 10:02
,你估计水果最晚会迟到 2 分钟。那么你就可以生成一个 Watermark(10:02 - 2分钟) = Watermark(10:00)
。这个 Watermark 会被广播给下游所有的算子。
核心思想: Watermark 是一种在 处理进度(低延迟) 和 数据完整性(结果准确) 之间的权衡机制。它告诉系统,事件时间时钟已经推进到了某个点,在这之前的数据不太可能再出现了,你可以安全地触发计算了。