Apache Spark 的结构化流
Apache Spark 的结构化流(Structured Streaming)是 Spark 专为伪实时(近实时,Near Real-Time)流数据处理设计的高级 API,它基于 DataFrame/Dataset API,提供了简单、高效、容错的流处理能力。
一、核心思想:流即无限表(Unbounded Table)
结构化流的核心设计理念是将流数据视为一张不断增长的 “无限表”(与静态批处理的 “有限表” 对应)。用户可以用编写静态批处理逻辑的方式(如 SQL、DataFrame 转换)来处理流数据,框架会自动将逻辑转换为持续运行的流处理任务,无需手动管理流的连续性、状态或容错。
二、处理模式:微批处理(Micro-Batch)与连续处理(Continuous)
结构化流支持两种处理模式,以平衡延迟和吞吐量:
1. 微批处理(默认模式)
- 原理:将流数据切分成一系列 “微批次”(Micro-Batches),每个批次处理一小段时间内的数据(默认最小批次间隔约 100ms)。
- 特点:延迟通常在几百毫秒到几秒(伪实时的典型范围),吞吐量高,支持所有结构化流的功能(如状态操作、精确一次语义)。
- 适用场景:大多数业务场景(如实时监控、日志分析、实时 ETL)。
2. 连续处理(实验性,Spark 2.3 + 引入)
- 原理:通过长期运行的连续读取器和写入器,以 “逐条处理” 的方式接近真正的实时(延迟可低至毫秒级)。
- 特点:延迟极低,但功能受限(仅支持部分转换操作,如 map、filter,不支持复杂状态操作),且容错保证较弱。
- 适用场景:对延迟要求极高的场景(如高频交易实时风控)。
三、核心组件与流程
结构化流的处理流程可分为三部分:输入源(Sources)→ 处理逻辑(Operations)→ 输出接收器(Sinks)。
1. 输入源(Sources)
负责读取流数据,支持的主流源包括:
- Kafka:最常用的流数据源(支持精确一次语义)。
- 文件系统(如 HDFS、S3):监控目录下新增的文件(支持文本、Parquet 等格式)。
- Socket:用于测试(从 TCP socket 读取文本数据)。
- 自定义源:通过实现
Source
接口扩展。
2. 处理逻辑(Operations)
用户通过 DataFrame/Dataset API 或 SQL 定义处理逻辑,支持与静态批处理完全一致的操作,例如:
- 基础转换:
select
、filter
、map
、flatMap
等。 - 聚合操作:
groupBy
(含状态聚合)。 - 关联操作:流与静态表的
join
、流与流的join
(需配合 Watermark)。 - 窗口操作:基于事件时间的滚动 / 滑动 / 会话窗口(核心功能)。
3. 输出接收器(Sinks)
负责将处理结果写入外部系统,支持的主流接收器包括:
- Kafka:写入流数据到 Kafka。
- 文件系统:以批处理方式写入文件(如 Parquet)。
- 控制台 / 内存:用于调试(
console
输出或memory
表)。 - Foreach/ForeachBatch:自定义输出逻辑(如写入数据库)。
- 更新模式:根据输出模式(Output Mode)决定如何更新结果。
四、关键概念:输出模式(Output Mode)
输出模式定义了流处理结果如何写入接收器,结构化流支持 3 种模式:
模式 | 适用场景 | 说明 |
---|---|---|
Append | 无聚合的转换(如 filter) | 仅将新处理的行追加到输出(类似日志追加),不修改历史结果。 |
Complete | 全局聚合(如group by count ) | 输出所有聚合结果的完整快照(每次更新都会重写全量结果)。 |
Update | 部分聚合或非聚合操作 | 仅输出被更新的行(新增或修改的结果),不输出未变化的历史行。 |
五、核心能力:状态管理与容错
流处理的核心挑战是状态管理(如聚合、窗口、关联)和容错(保证数据不丢不重),结构化流通过以下机制解决:
1. 状态管理(State Management)
结构化流自动维护处理过程中的状态(如聚合的中间结果、窗口的缓存数据),并支持:
- 状态持久化:状态数据默认存储在 Executor 的内存中,大状态可 spill 到磁盘。
- 状态清理:通过Watermark(水印)自动清理过期状态(如超过窗口时间的旧数据),避免状态无限增长。
2. Watermark(水印):处理延迟数据
实际场景中,数据可能因网络延迟等原因 “迟到”(事件时间 < 处理时间)。Watermark 用于定义 “可接受的最大延迟时间”:
- 原理:动态计算当前事件时间的最大值(
max_event_time
),超过max_event_time - watermark
的数据会被视为 “过期”,不再参与计算。 - 示例:若 Watermark 设为 10 分钟,当前最大事件时间是 10:00,则 9:50 之前的迟到数据会被丢弃。
Watermark 是流处理中平衡 “准确性” 和 “性能” 的关键,常用于窗口聚合、流流关联等场景。
3. 容错与精确一次语义(Exactly-Once)
结构化流通过检查点(Checkpointing) 和预写日志(WAL) 保证 “精确一次” 语义(数据被且仅被处理一次):
- 检查点:定期将流处理的元数据(如已处理的偏移量、状态快照、水印位置)写入可靠存储(如 HDFS)。
- 故障恢复:当应用崩溃重启时,从最近的检查点恢复状态和进度,避免重复处理或丢失数据。
六、典型场景示例
以 “实时统计网站 5 分钟滑动窗口内的 PV(页面访问量)” 为例,展示结构化流的使用流程:
1. 定义输入源(假设从 Kafka 读取,事件时间字段为event_time
)
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._// 定义Kafka源配置
val kafkaDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "page_visits") // 订阅的Kafka主题.load()// 解析Kafka消息(假设value是JSON格式,包含event_time和page_id)
val schema = new StructType().add("event_time", TimestampType).add("page_id", StringType)val visitsDF = kafkaDF.select(from_json(col("value").cast(StringType), schema).as("data")).select("data.event_time", "data.page_id")
2. 定义处理逻辑(5 分钟滑动窗口,每 1 分钟更新一次)
// 定义Watermark(允许数据延迟30秒)
val withWatermarkDF = visitsDF.withWatermark("event_time", "30 seconds") // 水印:最大延迟30秒// 5分钟滑动窗口(每1分钟滑动一次),按page_id统计PV
val windowPVDF = withWatermarkDF.groupBy(window(col("event_time"), "5 minutes", "1 minute"), // 窗口:5分钟大小,1分钟滑动col("page_id")).count().withColumnRenamed("count", "pv")
3. 定义输出接收器(写入控制台,Update 模式)
val query = windowPVDF.writeStream.format("console") // 输出到控制台.outputMode("update") // 仅输出更新的行.option("truncate", "false") // 不截断输出.start() // 启动流查询query.awaitTermination() // 等待查询结束
七、结构化流的优势
- 批流统一:用相同的 API 处理静态数据和流数据,降低学习成本。
- 简单易用:无需手动管理流的连续性、状态或容错,框架自动处理。
- 强容错保证:支持精确一次语义,适合生产环境。
- 丰富的状态操作:内置窗口、聚合、关联等状态处理能力,支持 Watermark 清理过期状态。
- 高性能:基于 Spark 的分布式计算引擎,可水平扩展,支持高吞吐量。
总结
Spark 结构化流是伪实时流处理的理想选择,它通过 “流即无限表” 的抽象、微批处理模式、精确一次语义和自动状态管理,简化了流数据处理的复杂度,适用于实时监控、日志分析、实时 ETL 等多种场景。