当前位置: 首页 > news >正文

「Flink」业务搭建方法总结

1. 合理设置并行度和TaskManager 的任务槽数
1.1 核心概念:
  1. 并行度:指 Flink 作业中特定算子(Operator)或整个作业的执行并行实例(即子任务)的数量。例如,map 算子的并行度为 5,意味着这个 map 操作会被拆分成 5 个完全相同的任务,同时在集群的不同地方处理数据流的不同分区。
  2. JobManager: Flink 集群的管理节点,负责调度作业、协调检查点、故障恢复等
  3. TaskManager: Flink 的工作节点(Worker Node)。每个 TaskManager 是一个独立的 JVM 进程,负责执行实际的任务(即算子子任务),Slot 是 TM 上执行任务的基本资源单元。
  4. 任务槽(slot): Flink 集群(如 TaskManager)中的基本资源单元。每个 TaskManager 是一个 JVM 进程,它可以提供一定数量的任务槽。一个任务槽可以执行一个算子并行度实例(即一个子任务)。JM 管理 Slot 的分配,TM 提供 Slot 的实际执行环境
1.2 并行度与资源的关系
  1. 并行度决定所需任务槽总数:

● 作业中所有算子并行度实例的总和(即整个作业图的所有子任务)必须小于或等于集群中可用任务槽的总数。

● 总子任务数 = 所有算子并行度实例之和 <= 总可用任务槽数 = TaskManager 数量 * 每个 TaskManager 的任务槽数

● 开启 Slot Sharing 时总槽数 = 所有算子中最大并行度值 (或关键路径所需槽数)(Flink 作业运行时,所有算子子任务会被分配到槽位。通常以作业图中 最宽算子 的并行度作为总槽数需求,因为 Flink 会尝试 Slot Sharing 将多个算子子任务链化到同一个槽位)

● 示例:

○ 作业含 Source(并行度=4) → Map(并行度=8) → Sink(并行度=2)

○ 实际所需槽数 = max(4,8,2) = 8(开启 Slot Sharing 时)

● 更高的并行度需要更多的任务槽

  1. 任务槽需要资源(CPU 和内存):

● 任务槽需要运行在一个 TaskManager JVM 进程中

● 每个 TaskManager 配置了CPU 核心数和内存

● 任务槽的资源占用: 每个任务槽会占用其所在 TaskManager 的一部分 CPU 和内存资源。

● 结论:更多的任务槽意味着每个任务槽分得的 CPU 时间片和内存(尤其是用户代码内存)更少

  1. 关系总结:

并行度↑ → 所需任务槽总数↑ → 所需 TaskManager 数量↑ 或 每个 TaskManager 的任务槽数↑

每个 TaskManager 的任务槽数↑ → 每个任务槽可用的 CPU 资源↓ (竞争加剧)

每个 TaskManager 的任务槽数↑ → 每个任务槽可用的用户堆内存↓

每个 TaskManager 的任务槽数↑ → 每个任务槽可用的用户堆外内存↓

每个 TaskManager 的任务槽数↑ → 共享内存区域(网络、托管)压力↑

1.3 常见问题与陷阱:

● 并行度过高,任务槽不足: 作业无法启动(NoResourceAvailableException)。

● 每个 TaskManager 任务槽数过多:

○ CPU 不足: 线程竞争激烈,CPU 利用率达到 100%,但吞吐量不增反降,延迟增大。

○ 内存不足: 每个槽分到的内存太少,导致用户代码频繁 GC 或 OutOfMemoryError (Java Heap Space)。

○ 网络内存不足: 导致反压加剧,吞吐量下降。

● 每个 TaskManager 任务槽数过少: 资源利用率低(CPU、内存闲置),成本高

2. 数据输入源
2.1 数据输入源设置uid

dataSource输入数据源默认都要设置uid,方便后续Checkpoint启动系统可以使用同一个uid,避免发送因输入源uid由系统随机产生,而后续更新无法使用Checkpoint启动,导致数据紊乱

2.2 uid和checkpoint的关系

● 状态恢复的桥梁: 当 Flink 从 Checkpoint/Savepoint 恢复作业时,它需要知道如何将 Checkpoint 里保存的状态数据“分配”给新运行的作业拓扑中的哪个算子实例。uid 就是这个分配的匹配依据。

● 匹配过程:

○ 恢复作业时,Flink 会读取 Checkpoint/Savepoint 的元数据,其中记录了每个状态片段对应的算子 uid。

○ 启动新的作业实例(可能是修改后的代码版本)。

○ Flink 将新作业拓扑中具有相同 uid 的算子与Checkpoint 中保存的对应 uid 的状态进行匹配。

○ 匹配成功,则该算子的状态从 Checkpoint 中恢复。

○ 匹配失败(找不到对应 uid 的算子),则根据配置(allowNonRestoredState)决定是失败还是忽略该部分状态继续启动

2.3 数据源处理和流转

多数据源处理时,建议所有数据源优先根据各自数据源的业务逻辑(如:联表查询字段)查出业务所需数据,最后各数据源整合为统一数据格式,进行最终的业务合并计算和处理

3. 流水线
3.1 系统时间(TumblingProcessingTimeWindows):

以Flink系统接收到这批数据的时间为准,通常与业务系统产生这批数据的实际时间有一定的时间差

3.2 事件时间(TumblingEventTimeWindows):

取业务数据中某个时间字段值作为流水线标准,相对于系统时间会更为精准计算业务数据

3.3 流水线设置

将所有数据源整合为统一数据格式后,可以以数据格式中的时间字段设为统一流水线,确保所有数据源合并(union)后使用统一流水线进行输出

WatermarkStrategy<InputModel> watermarkStrategy = WatermarkStrategy//表示允许的最大乱序时间为 5 秒.<InputModel>forBoundedOutOfOrderness(Duration.ofSeconds(5))// 取InputModel中time字段数据作为流水线.withTimestampAssigner((o, t) -> {return o.getTime();})//表示如果某分区5秒没有数据,则标记为空闲.withIdleness(Duration.ofSeconds(5));
3.4 流水线使用
dataSource.union(otherDataSource1, otherDataSource2, otherDataSource3)// 设置流水线.assignTimestampsAndWatermarks(watermarkStrategy)// 根据Key进行分区.keyBy(InputModel::getKey)// 设置流水线窗口大小  5秒为一个窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 业务数据计算处理  Integer为Key的数据类型.process(new ProcessWindowFunction<InputModel, OutputModel, Integer, TimeWindow>())
4. 数据业务处理
主要数据通过collect输出, 次要数据通过sideOut侧输出流输出
// 定义侧输出流
OutputTag<OutputModel> outputTag = new OutputTag<OutputModel>("public") {};SingleOutputStreamOperator<OutputModel> process = dataSource.union(otherDataSource1, otherDataSource2, otherDataSource3)// 设置流水线.assignTimestampsAndWatermarks(watermarkStrategy)// 根据Key进行分区.keyBy(InputModel::getKey)// 设置流水线窗口大小  5秒为一个窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 业务数据计算处理  Integer为Key的数据类型.process(.process(new ProcessWindowFunction<InputModel, OutputModel, Integer, TimeWindow>() {@Overridepublic void process(Integer key, ProcessWindowFunction<InputModel, OutputModel, Integer, TimeWindow>.Context context, Iterable<InputModel> iterable, Collector<OutputModel> collector) throws Exception {// 获取窗口结束时间点long end = context.window().getEnd();LocalDateTime windowEndTime = Instant.ofEpochMilli(end).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();// 业务逻辑计算// 侧输出流统计contect.output(outputTag, new OutPutModel(xxx, xxx, xxx, xxx));// 主要统计数据collector.collect(new OutPutModel(xxx,xxx,xxx, xxx));}
});
5.  数据输出
5.1 数据Sink输出

使用JDBC连接池进行连接提交至数据库,建议继承AbstractSink<OutPutModel>类,进行连接池资源共用,减少资源浪费

示例:

OutputSink

public class OutputSink extends AbstractSink<OutputModel> {//SQLprivate static final String OUTPUT_SQL = "insert into table_name(id,price,window_end_time,create_time) values(?,?,?,?)";@Overridepublic void invoke(GMVResultOutput value, SinkFunction.Context context) throws Exception {//  获取写入数据库连接资源Connection imsConn = connManager.getImsConnection();PreparedStatement outputStmt = null;try {outputStmt = imsConn.prepareStatement(OUTPUT_SQL);outputStmt.setLong(1, IdUtil.nextId());outputStmt.setBigDecimal(2, value.getPrice());outputStmt.setTimestamp(3, java.sql.Timestamp.valueOf(value.getWindowEndTime()));outputStmt.setTimestamp(4, java.sql.Timestamp.valueOf(LocalDateTime.now()));int i = outputStmt.executeUpdate();System.out.println("写入数据成功:" + i + "条");} catch (Exception e) {e.printStackTrace();} finally {closeResources(outputStmt);if (imsConn != null) {imsConn.close();}}}
}

AbstractSink<I>

public abstract class AbstractSink<I> extends RichSinkFunction<I> {protected transient JdbcConnectionManager connManager;@Overridepublic void open(Configuration parameters) throws Exception {connManager = new JdbcConnectionManager();connManager.open(); // 初始化连接}@Overridepublic void close() throws Exception {connManager.close(); // 关闭连接}// 辅助方法:关闭资源protected void closeResources(AutoCloseable... resources) {for (AutoCloseable res : resources) {if (res != null) {try { res.close(); } catch (Exception e) { /* Ignore */ }}}}
}
5.2 连接池

数据库连接使用连接池进行系统统一管理

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;public class JdbcConnectionManager implements Serializable {private transient HikariDataSource omsDataSource;private transient HikariDataSource imsDataSource;public void open() throws SQLException {// MySQL连接池配置(OMS)HikariConfig omsConfig = new HikariConfig();omsConfig.setJdbcUrl(OmsConstant.JDBC_URL);omsConfig.setUsername(OmsConstant.MYSQL_USER_NAME);omsConfig.setPassword(OmsConstant.MYSQL_PASSWORD);omsConfig.setMaximumPoolSize(20);       // 最大连接数(按需调整)omsConfig.setMinimumIdle(5);            // 最小空闲连接omsConfig.setConnectionTimeout(2000);    // 连接超时2秒omsConfig.setIdleTimeout(30000);        // 空闲超时30秒omsDataSource = new HikariDataSource(omsConfig);// MySQL连接池配置(IMS)HikariConfig imsConfig = new HikariConfig();imsConfig.setJdbcUrl(ImsConstant.IMS_JDBC_URL);imsConfig.setUsername(ImsConstant.IMS_USER_NAME);imsConfig.setPassword(ImsConstant.IMS_PASSWORD);imsConfig.setMaximumPoolSize(20);       // 最大连接数(按需调整)imsConfig.setMinimumIdle(5);            // 最小空闲连接imsConfig.setConnectionTimeout(2000);    // 连接超时2秒imsConfig.setIdleTimeout(30000);        // 空闲超时30秒imsDataSource = new HikariDataSource(imsConfig);}// 关闭连接池public void close() {if (omsDataSource != null) {omsDataSource.close();}if  (imsDataSource != null) {imsDataSource.close();}}// 从连接池获取连接(非物理关闭)public Connection getOmsConnection() throws SQLException {return omsDataSource.getConnection();}public Connection getImsConnection() throws SQLException {return imsDataSource.getConnection();}
}
5.3 数据输出源设置uid

业务数据处理好之后,使用Sink进行输出,输出时与输入源一样,需设置uid,以确保CheckPoint启动时可以正常启动

//  初始化outputSink
OutputSink outputSink = new OutputSink();// 明细
process.getSideOutput(outputTag).addSink(outputSink).name("side output").setParallelism(1).uid("side output Sink");
// 总和
process.addSink(outputSink).name("total output").setParallelism(1).uid("total output Sink");
6. 总结

以上的一些方法是近期基于业务的开发中遇到的一些坑点后,总结出来的一套相对比较完善的业务开发方法,便于后续Flink实时计算业务数据使用。如果大家有更好的建议和方法,也欢迎共同讨论学习!

http://www.lryc.cn/news/624500.html

相关文章:

  • 基于Flink CDC实现联系人与标签数据实时同步至ES的实践
  • Ansible文件部署与大项目多主机管理
  • 大数据开发面试题:美团秋招一面
  • 数据赋能(401)——大数据——持续学习与优化原则
  • 自建K8s集群无缝集成阿里云RAM完整指南
  • The Open Group 休斯敦峰会:进步之路——以开放标准定义未来
  • [openvela] Hello World :从零开始的完整实践与问题复盘
  • PDF转图片需要用到什么技术?苹果手机怎样将PDF转为jpg?
  • 在Excel启动时直接打开多个Excel文件
  • 2025上半年AI核心成果与趋势报告深度解析:技术突破、应用落地与未来展望
  • SQLsever基本操作
  • 网络间的通用语言TCP/IP-网络中的通用规则1
  • H264: SPS和PPS概念
  • thinkphp8:一、环境准备
  • Java-101 深入浅出 MySQL InnoDB 锁机制全景图:行锁原理、Next-Key Lock、Gap Lock 详解
  • 机器学习——XGBoost算法
  • python-----机器学习中常用的数据预处理
  • 机器学习之数据预处理(一)
  • 英特尔公司Darren Pulsipher 博士:以架构之力推动政府数字化转型
  • STM32使用WS2812灯环
  • 吴恩达 Machine Learning(Class 2)
  • Windows桌面自动化的革命性突破:深度解析Windows-MCP.Net Desktop模块的技术奥秘
  • 从零到一构建企业级GraphRAG系统:GraphRag.Net深度技术解析
  • OpenCV---特征检测算法(ORB,Oriented FAST and Rotated BRIEF)
  • SkyWalking + Elasticsearch8 容器化部署指南:国内镜像加速与生产级调优
  • 深度解析阿里巴巴国际站商品详情 API:从接口调用到数据结构化处理
  • Vision Master的C#脚本与opencv联合编程
  • 【GM3568JHF】FPGA+ARM异构开发板烧录指南
  • [系统架构设计师]软件可靠性基础知识(九)
  • 蔬菜批发小程序:生产商的数字化转型利器——仙盟创梦IDE