大数据计算引擎(二)——Flink
介绍(与Spark的区别,技术选型定位)
- 架构设计层面
Flink采用分布式数据流引擎,核心架构包含:
- JobManager(协调节点)
- TaskManager(工作节点)
- 基于Akka的RPC通信
- 原生支持DAG动态优化
Spark Structured Streaming架构:
- 基于Spark SQL引擎扩展
- Driver-Executor经典架构
- 微批调度器(Micro-batch Scheduler)
- 静态DAG执行计划
运行时特性对比
|| Flink | Spark Structured Streaming |
|---|---|---|
| 处理模式 | 持续流处理 | 微批处理(可配置连续模式) |
| 最小延迟 | 1ms级 | 100ms级(连续模式可达10ms) |
| 反压机制 | 基于TCP流量控制 | 基于批次动态调节 |
| 状态后端 | Memory/RocksDB | HDFS兼容存储 |
| 检查点间隔 | 可配置毫秒级 | 通常秒级以上 |API能力矩阵
Flink API体系:
javaCopy Code
// DataStream API示例 env.addSource(kafkaSource) .keyBy(event -> event.getUserId()) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new CustomWindowFunction());
- 支持:ProcessFunction(底层API)、Table API、CEP复杂事件处理
Spark Structured Streaming API:
// Structured Streaming示例 spark.readStream .format("kafka") .option("subscribe", "topic") .load() .groupBy($"user", window($"timestamp", "5 minutes")) .count()
- 主要依赖DataFrame API,缺少底层操作接口
- 时间处理深度对比
Flink时间机制:
- Watermark传播算法(Punctuated/Periodic)
- 支持处理时间、事件时间、摄入时间
- 允许自定义Watermark生成器
- 侧输出(Side Output)处理延迟数据
Spark时间处理:
- 固定周期Watermark(基于最大事件时间)
- 仅支持处理时间和事件时间
- 延迟数据直接丢弃(需手动配置allowedLateness)
- 容错机制实现
Flink Checkpoint:
- 分布式快照算法(Chandy-Lamport变种)
- 异步屏障快照(Asynchronous Barrier Snapshotting)
- 支持增量检查点(RocksDB后端)
- 端到端精确一次保证(需配合特定Source/Sink)
Spark检查点:
- 基于写前日志(WAL)和RDD血统
- 需要配置检查点目录(HDFS兼容)
- 精确一次语义需要外部系统配合
- 资源管理适配
Flink资源模型:
- 细粒度Slot分配
- 动态扩缩容(Reactive Mode)
- 原生支持Kubernetes
- 网络缓冲自动调节
Spark资源管理:
- 基于Executor静态分配
- 动态分配需Shuffle Service支持
- 批处理资源复用可能导致冲突
- 生态集成能力
Flink连接器:
- 官方维护30+连接器
- 支持CDC(Debezium集成)
- 完善的Table Store生态
- 原生Kubernetes Operator
Spark生态优势:
- 与MLlib/GraphX无缝集成
- Delta Lake深度整合
- Databricks商业版增强功能
- 性能基准测试
在同等资源下(10节点集群):
吞吐量:
- Flink可达200万事件/秒
- Spark约120万事件/秒(微批间隔1s时)
99%延迟:
- Flink:<50ms
- Spark:200-500ms
- 版本演进趋势
Flink 2.0+方向:
- 统一批流执行模式
- 增强Python API
- 改进State TTL管理
Spark 3.5+改进:
- 连续处理模式优化
- 增强事件时间处理
- 改进状态存储后端
- 选型决策树
建议选择Flink当:
✓ 需要亚秒级延迟
✓ 处理复杂事件模式
✓ 要求端到端精确一次
✓ 需要动态扩缩容建议选择Spark当:
✓ 已有Spark批处理流水线
✓ 准实时(分钟级)场景
✓ 需要与MLlib集成
✓ 团队熟悉Spark生态两者最新版本都支持:
- SQL标准语法(ANSI SQL:2011)
- 流式维表关联(Async I/O)
- 统一元数据管理
- 容器化部署
状态
- 状态类型体系
Keyed State(键控状态):
- 与特定Key绑定,仅可在KeyedStream上使用
- 包含:
- ValueState<T>:单值状态
- ListState<T>:列表状态
- MapState<K,V>:映射状态
- ReducingState<T>:聚合状态
- AggregatingState<IN,OUT>:高级聚合状态
Operator State(算子状态):
- 与算子实例绑定,支持:
- ListState<T>:均匀分区状态
- UnionListState<T>:全量广播状态
- BroadcastState<K,V>:广播状态
- 状态后端实现
MemoryStateBackend:
- 状态存储在JVM堆内存
- 检查点存于JobManager内存
- 适用开发测试场景
FsStateBackend:
- 工作状态在堆内存
- 检查点持久化到文件系统(HDFS等)
- 生产环境常用方案
RocksDBStateBackend:
- 工作状态存于本地RocksDB
- 检查点持久化到远端存储
- 支持增量检查点
- 超大状态场景首选
- 状态工作原理
状态生命周期:
// 状态注册示例 ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("counter", Long.class); ValueState<Long> state = getRuntimeContext().getState(descriptor);
- 初始化:通过RuntimeContext注册
- 读写:通过state.value()/update()方法
- 清除:state.clear()
状态分片机制:
- Keyed State按KeyGroup分片
- Operator State采用均匀分区或广播策略
- 分片数=最大并行度/KeyGroup数量
- 检查点原理
屏障传播:
- JobManager触发检查点
- Source算子注入屏障(Barrier)
- 屏障随数据流向下游传播
异步快照:
graph LR A[收到屏障] --> B[持久化状态] B --> C[发送确认] C --> D[继续处理数据]
- 算子收到屏障时冻结状态
- 异步写入持久化存储
- 完成后向JobManager确认
- 状态恢复机制
精确一次保证:
- 基于检查点+事务性输出
- 两阶段提交协议:
- 预提交:暂存结果
- 确认提交:检查点完成后提交
恢复流程:
- 定位最近完成的检查点
- 重新分配状态分片
- 从存储系统加载状态快照
- 状态TTL管理
- 过期策略配置:
StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(1)) .setUpdateType(OnCreateAndWrite) .setStateVisibility(NeverReturnExpired) .build(); descriptor.enableTimeToLive(ttlConfig);
- 支持基于处理时间/事件时间的过期
- 可配置访问时更新TTL
- 支持全量/增量清理策略
- 状态扩展机制
可查询状态(Queryable State):
- 通过QueryableStateClient外部查询
- 需启用状态服务器:
# flink-conf.yaml配置 query.server.ports: 9067 queryable-state.enable: true
状态迁移:
- 通过State Processor API导出/导入
- 支持状态模式演进(Schema Evolution)
- 性能优化技巧
状态序列化:
- 优先使用Flink类型序列化器
- 复杂类型注册Kryo序列化
- 避免使用Java原生序列化
RocksDB调优:
state.backend.rocksdb: block.cache-size: 256MB writebuffer.size: 128MB compaction.level.max-size-level-base: 256MB
- 状态监控指标
- 关键监控项:
numKeyedStateEntries
:状态条目数stateSize
:状态大小checkpointDuration
:检查点耗时lastCheckpointSize
:最近检查点大小
- 典型应用场景
有状态函数:
- 窗口聚合(Window Aggregation)
- 模式检测(CEP)
- 会话跟踪(Session Tracking)
状态迁移案例:
- Flink版本升级
- 业务逻辑变更
- 并行度调整
水位线
窗口
CEP
反压(背压)机制
Flink的反压机制(Backpressure)是流处理系统中用于应对数据生产速度超过消费速度的关键流量控制机制。
一、常见反压场景
1. 数据倾斜(Data Skew)
- 表现:少数TaskManager负载极高,其他空闲
- 成因:
- KeyBy后某些Key数据量过大(如用户ID分布不均)
- 窗口聚合时热点时间窗口
2. 外部系统瓶颈
- 表现:Sink算子持续反压
- 成因:
- 数据库写入速率不足(如JDBC Sink)
- Kafka分区数不足或Broker吞吐瓶颈
- 第三方API调用延迟过高
3. 计算资源不足
- 表现:全链路反压
- 成因:
- 算子并行度设置过低
- TaskManager内存/CPU不足
- 网络带宽受限(跨机房传输)
4. 算子处理逻辑过重
- 表现:单个算子持续反压
- 成因:
- 复杂UDF(如JSON解析、机器学习推理)
- 状态操作频繁(大状态Checkpoint)
- 维表关联(未优化Async I/O)
5. 背靠背算子阻塞
- 表现:上游算子输出缓冲满,下游无消费
- 成因:
- 同步阻塞调用(如Thread.sleep)
- 锁竞争/资源争用
二、系统化解决方法
1. 诊断工具先行
- Web UI反压监控:
- 定位反压起始算子(红色/黄色标记)
- 查看
Buffers in flight
和Output buffer usage
- Metrics监控:
outPoolUsage
> 0.8 持续告警busyTimeMsPerSecond
接近1000ms表示CPU饱和- 火焰图分析:抓取JVM线程栈定位热点代码
shellCopy Code
# 使用AsyncProfiler生成火焰图 ./profiler.sh -d 30 -f flamegraph.html <TaskManager_PID>
2. 针对性优化方案
场景 优化措施 数据倾斜 - 添加随机前缀打散Key( keyBy(key + "_" + random.nextInt(10))
)
- 开启LocalKeyBy(Flink 1.13+)
- 两阶段聚合(预聚合+全局聚合)外部系统瓶颈 - Sink端:增加Kafka分区数 / 启用批量写入
- 异步IO+缓存(Guava Cache/Caffeine)
- 限流降级(如令牌桶算法)资源不足 - 调高并行度(与Kafka分区数对齐)
- 增加TM内存(taskmanager.memory.process.size: 4096m
)
- 使用SSD减少Checkpoint延迟算子逻辑过重 - 复杂计算卸载到GPU(Flink CUDA支持)
- 状态拆分:ListState
替代大对象
- 开启增量Checkpoint背靠背阻塞 - 替换同步操作为异步IO( AsyncDataStream.unorderedWait
)
- 避免算子内线程阻塞3. 高级调优技巧
- 网络优化:
# 增加网络缓冲区 taskmanager.network.memory.max: 512mb taskmanager.memory.segment-size: 32kb
- 反压传播控制:
env.setBufferTimeout(10); // 减少缓冲等待时间
- 精确反压定位:
# 启用反压采样 curl -X POST http://localhost:8081/jobs/<jobID>/backpressure
4. 架构级解决方案
- 动态扩缩容:
- Kubernetes + Flink AutoScaler(根据反压指标自动调整并行度)
- 流批一体降级:
- 反压时切到批处理模式补偿(如Apache Iceberg)
- 多级背压缓冲:
用Kafka作为缓冲层吸收突发流量
graph LR Source --> Kafka --> Flink --> Redis
三、生产环境案例
场景:电商大促期间,订单统计Job因
count(distinct user_id)
导致反压
解决:
- 诊断发现
KeyBy(user_id)
后倾斜严重- 改造为两阶段聚合:
-- 第一阶段:局部去重 SELECT HASH_CODE(user_id)%10 as bucket, user_id FROM orders GROUP BY bucket, user_id -- 第二阶段:全局合并 SELECT COUNT(*) FROM temp_table GROUP BY bucket
- 并行度从20提升至50,倾斜消失后反压解除
运维建议
- 预防性监控:Prometheus + Grafana持续跟踪
numRecordsOutPerSecond
与numRecordsInPerSecond
比值- 混沌测试:使用flink-fault-injection模拟网络延迟/数据倾斜
- 升级版本:Flink 1.13+ 的Unified Scheduler显著改善反压传播效率
sql(与普通sql的区别)
FlinkSQL与普通SQL的主要区别体现在以下几个方面:
- 执行环境:
- 普通SQL:面向静态数据集(如MySQL/Oracle等关系型数据库)
- FlinkSQL:面向动态数据流(实时处理无界数据流)
- 核心能力:
- 普通SQL:批处理(一次性完整数据集操作)
- FlinkSQL:同时支持流处理(持续增量计算)和批处理
- 时间语义:
- 普通SQL:仅处理事件时间(event time)
- FlinkSQL:支持事件时间、处理时间、摄入时间三种时间语义
- 窗口操作:
- 普通SQL:无原生窗口概念
- FlinkSQL:内置滚动/滑动/会话窗口,支持时间/计数窗口
- 状态管理:
- 普通SQL:无状态(每次查询独立)
- FlinkSQL:有状态计算(支持持续查询维护中间状态)
- 连接器支持:
- 普通SQL:主要连接关系型数据库
- FlinkSQL:支持Kafka/ES/JDBC/HBase等20+连接器
- 语法扩展:
- FlinkSQL扩展了MATCH_RECOGNIZE(复杂事件处理)、TOP-N等特殊语法
- 支持自定义UDF/UDAF/UDTF函数
- 执行计划:
- 普通SQL:生成静态执行计划
- FlinkSQL:动态优化执行计划(基于数据特征自适应调整)
典型应用场景对比:
- 普通SQL:报表生成、数据分析(T+1)
- FlinkSQL:实时风控、实时大屏、流式ETL(毫秒级延迟)
// 引入必要依赖 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQLDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 2. 创建Kafka源表(流处理)tableEnv.executeSql("CREATE TABLE kafka_source (" +" user_id STRING," +" item_id STRING," +" behavior STRING," +" ts TIMESTAMP(3)," +" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'user_behavior'," +" 'properties.bootstrap.servers' = 'localhost:9092'," +" 'format' = 'json'" +")");// 3. 创建JDBC结果表tableEnv.executeSql("CREATE TABLE jdbc_sink (" +" user_id STRING," +" buy_cnt BIGINT," +" PRIMARY KEY (user_id) NOT ENFORCED" +") WITH (" +" 'connector' = 'jdbc'," +" 'url' = 'jdbc:mysql://localhost:3306/flink_db'," +" 'table-name' = 'user_stats'," +" 'username' = 'root'," +" 'password' = '123456'" +")");// 4. 执行聚合查询(窗口统计)tableEnv.executeSql("INSERT INTO jdbc_sink " +"SELECT user_id, COUNT(*) as buy_cnt " +"FROM kafka_source " +"WHERE behavior = 'buy' " +"GROUP BY user_id, TUMBLE(ts, INTERVAL '1' HOUR)");// 5. 批处理示例(静态数据集)tableEnv.executeSql("CREATE TABLE csv_source (" +" name STRING," +" age INT," +" city STRING" +") WITH (" +" 'connector' = 'filesystem'," +" 'path' = 'input/persons.csv'," +" 'format' = 'csv'" +")");// 6. 执行批查询tableEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print') " +"AS SELECT city, AVG(age) FROM csv_source GROUP BY city");} }
Flink SQL 的本质确实是基于数据流处理的,但需要从以下维度理解其特性:
- 底层执行模型:
- Flink SQL 的运行时建立在流处理引擎之上,所有查询都会被转换为数据流操作
- 即使处理批数据(有界流),底层仍采用流式处理机制
- 统一处理范式:
- 通过Table API/SQL层对批流进行统一抽象
- 批处理被视为特殊的流处理(有界流)
- 核心特征体现:
- 持续查询机制:结果会随新数据到达不断更新
- 增量计算:通过状态维护实现高效更新
- 时间语义:支持事件时间处理(流处理核心需求)
- 与标准SQL的关键差异:
-- FlinkSQL特有的流式语法示例 SELECT user_id, COUNT(*) OVER ( PARTITION BY user_id ORDER BY procTime RANGE INTERVAL '1' HOUR PRECEDING ) AS hourly_actions FROM kafka_source
典型流处理特征在SQL中的体现:
- 窗口聚合(TUMBLE/HOP/SESSION)
- 水位线(WATERMARK)定义
- 撤回机制(RETRACT模式)
- 动态表(Dynamic Table)概念
注意:虽然底层是流式处理,但通过优化器可以实现:
- 批处理场景下自动切换为批执行模式
- 有限流作业的自动优化(如Final模式聚合)
函数用例
以下是Flink SQL函数的分类详解及典型用例(基于Flink 1.16版本):
一、时间函数
DATE_FORMAT(timestamp, 'yyyy-MM-dd')
sqlCopy Code
SELECT DATE_FORMAT(order_time, 'yyyy/MM/dd') FROM orders -- 输出:2025/08/18
TUMBLE_START(rowtime, INTERVAL '1' HOUR)
sqlCopy Code
SELECT TUMBLE_START(ts, INTERVAL '1' HOUR) AS window_start, COUNT(*) FROM clicks GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
二、字符串函数
REGEXP_EXTRACT(url, '//([^/]+)')
sqlCopy Code
SELECT REGEXP_EXTRACT(url, 'https://([^/]+)') FROM web_logs -- 输入:https://example.com/page → 输出:example.com
JSON_VALUE('{"name":"John"}', '$.name')
sqlCopy Code
SELECT JSON_VALUE(user_info, '$.address.city') FROM users
三、聚合函数
COUNT(DISTINCT user_id)
sqlCopy Code
SELECT COUNT(DISTINCT visitor_id) FROM page_views
LISTAGG(product_name, ', ')
sqlCopy Code
SELECT order_id, LISTAGG(product_name, '|') FROM order_items GROUP BY order_id
四、条件函数
CASE WHEN score > 90 THEN 'A' ELSE 'B' END
sqlCopy Code
SELECT user_id, CASE WHEN login_count > 5 THEN 'active' ELSE 'inactive' END FROM users
COALESCE(NULLIF(discount, 0), 10)
sqlCopy Code
SELECT COALESCE(department, 'Unknown') FROM employees
五、数学函数
ROUND(price * 1.1, 2)
sqlCopy Code
SELECT ROUND(AVG(temperature), 1) FROM sensors
LOG(2, value)
sqlCopy Code
SELECT LOG(10, click_count) FROM ads
六、表值函数
LATERAL TABLE(explode(ARRAY[1,2,3]))
sqlCopy Code
SELECT user_id, t.item FROM orders, LATERAL TABLE(explode(items)) AS t(item)
七、系统函数
CURRENT_WATERMARK(rowtime)
sqlCopy Code
SELECT CURRENT_WATERMARK(ts) FROM kafka_source
八、窗口函数
HOP_END(ts, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE)
sqlCopy Code
SELECT HOP_END(event_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR), SUM(amount) FROM transactions GROUP BY HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
九、自定义函数示例
sqlCopy Code
CREATE FUNCTION geo_distance AS 'com.udf.GeoDistanceUDF'; SELECT geo_distance(lat1, lon1, lat2, lon2) FROM locations
特殊场景函数:
- 去重统计:
sqlCopy Code
SELECT HOP_START(ts, INTERVAL '5' SECOND, INTERVAL '1' MINUTE), COUNT(DISTINCT user_id) FROM clicks GROUP BY HOP(ts, INTERVAL '5' SECOND, INTERVAL '1' MINUTE)
- Top-N模式:
sqlCopy Code
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rownum FROM products ) WHERE rownum <= 3
完整函数参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/functions/systemfunctions/
调优
Flink作为流批一体的大数据处理框架,性能调优是提高作业效率的关键。以下是Flink调优的主要方面:
1. 资源配置调优
- 并行度设置:根据数据量和集群资源合理设置算子并行度
- 内存配置:调整TaskManager和JobManager的内存分配
- 网络缓冲区:优化
taskmanager.network.memory.fraction
和taskmanager.network.memory.max
2. 状态后端优化
- 选择合适的后端:RocksDBStateBackend适合大状态,FsStateBackend适合中小状态
- RocksDB调优:调整
state.backend.rocksdb.block.cache-size
等参数- 增量检查点:启用
state.backend.incremental
减少检查点时间3. 检查点与容错
- 检查点间隔:平衡容错开销和恢复时间
- 对齐超时:调整
execution.checkpointing.alignment-timeout
- 本地恢复:启用
state.backend.local-recovery
加速恢复4. 数据倾斜处理
- KeyBy前预处理:对倾斜Key进行加盐或打散
- 两阶段聚合:先局部聚合再全局聚合
- 自定义分区器:实现更均衡的数据分布
5. 序列化优化
- 使用高效序列化器:如Kryo或自定义序列化
- 注册类型:通过
env.registerType()
减少序列化开销6. 代码层面优化
- 避免频繁对象创建:重用对象减少GC压力
- 合理使用窗口:选择适合业务的时间/计数窗口
- 异步IO:对数据库访问等操作使用AsyncFunction
7. 资源调度优化
- Slot共享组:合理配置slot共享减少资源浪费
- 资源隔离:关键算子使用独立资源组
8. 监控与诊断
- Metrics系统:利用Flink Metrics监控作业状态
- 背压分析:通过Web UI或Metrics识别背压来源
- 日志分析:关注GC日志和异常日志
需要更具体的调优建议,请提供您的Flink作业配置和当前遇到的性能问题。
感谢阅读!!!