当前位置: 首页 > news >正文

深入解析Flink Local模式启动流程源码:揭开作业初始化的神秘面纱

在Flink的数据处理体系中,Local模式凭借无需依赖分布式集群资源的特性,成为开发测试阶段快速验证作业逻辑的利器。其启动流程的源码里,藏着从作业提交到任务执行的完整脉络。接下来,我们将深入关键代码段,逐行剖析Flink Local模式启动的底层逻辑。

一、Local模式启动流程概述

Flink Local模式允许作业在本地环境运行,整个启动流程涵盖作业提交、环境初始化、任务调度等环节。ExecutionEnvironmentJobClientLocalExecutor等核心组件紧密协作,使得作业能从代码转化为实际运行的任务,下面我们从源码角度展开详细分析。

二、核心启动流程源码深度解析

2.1 作业提交与环境初始化触发

用户通过ExecutionEnvironment.execute()提交作业时,启动流程正式开启。以StreamExecutionEnvironment为例,来看关键源码:

public JobExecutionResult execute(String jobName) throws Exception {setJobName(jobName);// 根据用户编写的DataStream API代码生成StreamGraph,定义作业数据处理逻辑final StreamGraph streamGraph = getStreamGraph(); return execute(streamGraph);
}public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {// 创建执行器,不同模式下执行器不同,Local模式对应特殊实现final Executor executor = createExecutor(); return executor.execute(streamGraph);
}

getStreamGraph()方法中,会遍历用户定义的转换操作(Transformation),构建StreamGraph。如在构建Source节点时:

// SourceTransformationTranslator中translateInternal方法片段
public Collection<Integer> translateInternal(final SourceTransformation<OUT, SplitT, EnumChkT> transformation,final Context context,boolean emitProgressiveWatermarks) {// 获取关键信息final StreamGraph streamGraph = context.getStreamGraph();final String slotSharingGroup = context.getSlotSharingGroup();final int transformationId = transformation.getId();final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();// 创建SourceOperatorFactory,用于实例化Source算子SourceOperatorFactory<OUT> operatorFactory =new SourceOperatorFactory<>(transformation.getSource(),transformation.getWatermarkStrategy(),emitProgressiveWatermarks);// 将Source节点添加到StreamGraphstreamGraph.addSource( transformationId,slotSharingGroup,transformation.getCoLocationGroupKey(),operatorFactory,null,transformation.getOutputType(),"Source: " + transformation.getName());// 设置并行度等参数final int parallelism =transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT?transformation.getParallelism() : executionConfig.getParallelism();streamGraph.setParallelism(transformationId, parallelism);streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());return Collections.singleton(transformationId);
}

上述代码展示了Source节点如何被添加到StreamGraph,为后续任务执行奠定基础。

2.2 JobClient创建与任务提交实现

环境初始化完成后,进入JobClient创建阶段。JobClientJobClientFactory创建,在Local模式下为LocalJobClient

// JobClientFactory的createJobClient方法
public static JobClient createJobClient(JobGraph jobGraph,Configuration configuration
) throws IOException {final String jobManagerAddress;final int jobManagerPort;// Local模式下特殊处理if (configuration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { jobManagerAddress = "localhost";jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_RPC_PORT, -1);} else {// 非Local模式逻辑jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_RPC_ADDRESS_KEY);jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_RPC_PORT, -1);}final RpcService rpcService = createRpcService(configuration);final RpcGatewayTarget jobManagerGatewayTarget =new RpcGatewayTarget(jobManagerAddress, jobManagerPort, "jobmanager");// 根据不同模式创建对应的JobClientif (LocalExecutor.isLocalExecution(configuration)) { return new LocalJobClient(jobGraph, rpcService, jobManagerGatewayTarget);} else {return new StandaloneJobClient(jobGraph, rpcService, jobManagerGatewayTarget);}
}

LocalJobClient创建后,会将JobGraph提交给LocalExecutor,提交前会对JobGraph进行校验,确保任务定义无误:

// LocalJobClient的submitJob方法
public CompletableFuture<JobExecutionResult> submitJob() {final CompletableFuture<JobExecutionResult> jobResultFuture = new CompletableFuture<>();try {// 校验JobGraphPreconditions.checkState(jobGraph != null, "JobGraph must not be null."); // 将JobGraph提交给LocalExecutorfinal CompletableFuture<JobExecutionResult> executionFuture =localExecutor.execute(jobGraph, jobResultFuture::complete, jobResultFuture::completeExceptionally); return executionFuture;} catch (Exception e) {jobResultFuture.completeExceptionally(e);return jobResultFuture;}
}

2.3 LocalExecutor任务执行逻辑

LocalExecutor是Local模式任务执行核心。它先根据JobGraph创建TaskExecutorRunner

// LocalExecutor的execute方法片段
public CompletableFuture<JobExecutionResult> execute(JobGraph jobGraph,Consumer<JobExecutionResult> resultConsumer,Consumer<Throwable> failureConsumer
) {final CompletableFuture<JobExecutionResult> jobResultFuture = new CompletableFuture<>();try {// 解析JobGraph获取任务相关配置final TaskExecutorConfiguration taskExecutorConfiguration =TaskExecutorConfiguration.fromConfigurationAndJobGraph(configuration, jobGraph); // 创建TaskExecutorFactoryfinal TaskExecutorFactory taskExecutorFactory = new TaskExecutorFactory(taskExecutorConfiguration); // 创建TaskExecutorRunnerfinal TaskExecutorRunner taskExecutorRunner =new TaskExecutorRunner(taskExecutorConfiguration, taskExecutorFactory); // 启动TaskExecutorRunnertaskExecutorRunner.start(); // 提交任务到TaskExecutorRunner执行final CompletableFuture<JobExecutionResult> executionFuture =taskExecutorRunner.submitJob(jobGraph, resultConsumer, failureConsumer); return executionFuture;} catch (Exception e) {jobResultFuture.completeExceptionally(e);return jobResultFuture;}
}

TaskExecutorRunner启动时,会为任务分配资源、初始化执行环境。以内存分配为例:

// TaskExecutorRunner的start方法片段
public void start() throws Exception {// 分配内存资源,根据任务配置计算所需内存final MemorySize taskHeapMemory = taskExecutorConfiguration.getTaskHeapMemorySize(); final MemorySize taskOffHeapMemory = taskExecutorConfiguration.getTaskOffHeapMemorySize(); // 初始化内存相关环境memoryManager = MemoryManagerFactory.createMemoryManager(taskHeapMemory,taskOffHeapMemory,configuration); // 其他资源初始化及环境准备操作//...
}

2.4 任务初始化与运行机制

任务线程中,会加载StreamOperator并初始化。以MapOperator为例:

// MapOperator的初始化方法片段
public void open() throws Exception {super.open();// 初始化用户定义的MapFunctionuserFunction = userFunctionSerializer.deserialize(new DeserializationContext.GetInitialContext()); // 初始化输入输出相关资源inputSerializer.open();outputSerializer.open();
}// MapOperator的处理数据方法
public void processElement(StreamRecord<IN> element) throws Exception {// 从输入获取数据IN input = element.getValue();// 执行用户定义的映射函数OUT output = userFunction.map(input); // 将处理后的数据发送到输出outputCollector.collect(new StreamRecord<>(output, element.getTimestamp())); 
}

数据在各个任务间通过StreamEdge流转,完成整个作业处理。

三、关键组件与技术细节源码剖析

3.1 资源管理与分配策略

LocalExecutor依据JobGraph为任务分配资源。在分配线程资源时:

// LocalExecutor中任务线程创建相关逻辑
private void createTaskThreads(JobGraph jobGraph) {for (JobVertex jobVertex : jobGraph.getVertices()) {final int parallelism = jobVertex.getParallelism();for (int i = 0; i < parallelism; i++) {// 创建任务线程final TaskThread taskThread = new TaskThread(jobVertex, i); taskThreads.add(taskThread);// 启动任务线程taskThread.start(); }}
}

同时,通过线程池管理避免资源耗尽:

// 线程池相关定义与使用
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>()
);// 提交任务到线程池执行
public void submitTask(Runnable task) {threadPoolExecutor.submit(task);
}

3.2 任务调度与协调机制

LocalExecutorJobGraph任务依赖调度。调度逻辑如下:

// LocalExecutor的任务调度方法
private void scheduleTasks(JobGraph jobGraph) {final Set<JobVertex> readyVertices = new HashSet<>();for (JobVertex jobVertex : jobGraph.getVertices()) {// 检查任务是否没有上游依赖if (jobVertex.getInEdges().isEmpty()) { readyVertices.add(jobVertex);}}while (!readyVertices.isEmpty()) {final JobVertex jobVertex = readyVertices.iterator().next();readyVertices.remove(jobVertex);// 启动任务startTask(jobVertex); for (JobEdge jobEdge : jobVertex.getOutEdges()) {final JobVertex targetVertex = jobEdge.getTarget();boolean allPredecessorsCompleted = true;for (JobEdge inEdge : targetVertex.getInEdges()) {if (!inEdge.getSource().isFinished()) {allPredecessorsCompleted = false;break;}}if (allPredecessorsCompleted) {readyVertices.add(targetVertex);}}}
}

3.3 错误处理与恢复机制

任务提交阶段,JobGraph校验失败时:

// JobGraph校验方法片段
public void validate() throws InvalidJobException {for (JobVertex jobVertex : getVertices()) {if (jobVertex.getParallelism() <= 0) {throw new InvalidJobException("Job vertex " + jobVertex + " has invalid parallelism.");}// 其他校验逻辑//...}
}

任务执行中出现异常,TaskExecutorRunner处理如下:

// TaskExecutorRunner的任务执行异常处理
public void run() {try {// 任务执行逻辑//...} catch (Exception e) {// 上报异常exceptionHandler.handleException(e); // 根据异常类型处理,可恢复则尝试重启任务if (isRecoverableException(e)) { restartTask();} else {// 不可恢复则终止作业terminateJob(e); }}
}

四、Local模式启动流程的实践意义与优化方向

深入研究Flink Local模式启动流程源码,开发者在开发测试时,可通过断点调试StreamExecutionEnvironment初始化、LocalExecutor任务调度等关键代码,快速定位问题。优化资源分配策略时,参考LocalExecutor中资源分配源码,根据任务实际需求动态调整分配逻辑。同时,对比分布式模式启动流程源码,借鉴其动态资源调度思路,可进一步完善Local模式性能。

http://www.lryc.cn/news/573572.html

相关文章:

  • Ubuntu20 搭建 Java、Redis、Nginx
  • GO 语言学习 之 helloWorld
  • 2025年SVN学习价值分析
  • react day.js使用及经典场景
  • 【RocketMQ 生产者和消费者】- 消费者重平衡(3)- 消费者 ID 对负载均衡的影响
  • 微前端MFE: 通过共享模块通信(模块联邦Module Federation)
  • 【机器学习四大核心任务类型详解】分类、回归、聚类、降维都是什么?
  • 【论文阅读笔记】TransparentGS:当高斯溅射学会“看穿”玻璃,如何攻克透明物体重建难题?
  • 【Nature Communications】超高介电常数材料 Hf0.5Zr0.5O2(HZO)
  • Oracle 11G RAC修改public ip vip private ip
  • 【数据治理】要点整理-《数据管理能力成熟度评估模型》国家标准(GB/T 36073—2018)
  • Linux的文件权限
  • 16_设备树中的remote-endpoint演示基于视频字符设备Linux内核模块
  • python源码:执行pdf合并/分页/图片管理功能
  • 计算机网络课程设计--基于TCP协议的文件传输系统
  • 案例练习二
  • rom定制系列------红米note11 5G版 MTK芯片强解bl锁修复bug 官方系统 面具root批量线刷版
  • 魂斗罗ost 游戏全合集8GB
  • 微服务网关/nacos/feign总结
  • Mybatis-Plus支持多种数据库
  • 使用模板创建uniapp提示未关联uniCloud问题
  • LeapMotion-PhysicalHandsManager 类详解
  • 【后端】负载均衡
  • 怎么让二级域名绑定到wordpesss指定的页面
  • Linux系统基本操作指令
  • C++指针(二)
  • 【算法】【优选算法】优先级队列
  • 跨个体预训练与轻量化Transformer在手势识别中的应用:Bioformer
  • 告别线程爆炸:我如何用 Spring WebFlux 构建一个端到端响应式应用
  • 编程基础:调用访问