Flink on YARN启动全流程深度解析
Flink on YARN 模式启动流程及核心组件协作详解
整个过程分为三个主要阶段:
JobManager 启动(作业提交与 AM 初始化)
TaskManager 资源分配与启动
任务部署与执行
第一阶段:作业提交与 JobManager (AM) 启动
目标:在 YARN 集群上启动 Flink 的"大脑"——JobManager
组件协作流程
1. 客户端 (Client) -> YARN ResourceManager (RM)
生成 JobGraph
开发环境通过 Flink 命令行/代码提交作业时,客户端分析代码生成
JobGraph
数据结构。该静态逻辑视图描述算子(Source/Map/Sink等)及其数据流向。申请 ApplicationMaster (AM)
客户端连接 YARN RM 请求启动 Flink AM。Per-Job 模式下该 AM 是 JobManager + Flink ResourceManager 的组合体。客户端需上传:
JobGraph
Flink 框架 Jar 包
用户代码 Jar 包
(资源存储于 HDFS 等共享存储供 AM 访问)
2. YARN RM -> YARN NodeManager (NM)
分配容器 (Container)
RM 选择计算节点(NodeManager)并分配资源单位 Container(独立 JVM 进程环境)
3. YARN NM -> Flink JobManager (AM)
启动 AM
NodeManager 通过
launch_container.sh
启动 ApplicationMaster,入口点为YarnJobClusterEntrypoint
,关键操作包括:环境初始化
打印 Flink 版本/JVM/OS 信息
加载
flink-conf.yaml
配置
核心服务启动
RpcService
:组件间 RPC 通信HAService
:JobManager 高可用BlobServer
:传输作业 Jar 等大二进制对象HeartbeatServices
:心跳检测
Flink 资源管理器启动
YarnResourceManager
:与外部 YARN RM 通信(申请/释放 Container)SlotManager
:管理 TaskManager 提供的 Slot 资源状态
作业执行核心启动
Dispatcher
:接收 JobGraph 并启动对应 JobManagerJobManager
:将 JobGraph 转换为包含并行度/任务链/资源分配细节的ExecutionGraph
第二阶段:TaskManager 资源分配与启动
目标:根据作业需求启动计算"工人" TaskManager
组件协作流程
1. JobManager -> Flink ResourceManager
申请 Slot
JobManager 分析 ExecutionGraph 后向 Flink ResourceManager 申请所需计算资源(Slot)
2. Flink ResourceManager -> YARN RM
资源检查与申请
SlotManager
先检查现有空闲 Slot不足时由
YarnResourceManager
向 YARN RM 申请新 Container
3. YARN RM -> YARN NM -> Flink TaskManager
容器分配与进程启动
YARN RM 在其他 NodeManager 分配新 Container
Flink ResourceManager 指示启动
TaskManager
(入口类YarnTaskExecutorRunner
)
TaskManager 初始化
启动核心组件
TaskExecutor
(实际任务执行单元)每个 TaskExecutor 拥有与机器 CPU 核数匹配的 Slot(独立任务线程池)
4. TaskManager -> Flink ResourceManager & JobManager
资源注册流程
RPC 向 Flink ResourceManager 注册自身
汇报可用 Slot 数量及状态(SlotManager 掌握新资源)
与 JobManager 建立心跳连接(健康监控)
第三阶段:任务部署与执行
目标:将计算任务分发至 TaskManager 并启动持续计算
组件协作流程
1. Flink ResourceManager -> JobManager
Slot 分配
SlotManager 将可用 Slot 分配给等待的 JobManager 请求,并通知目标位置(某 TaskManager 的指定 Slot)
2. JobManager -> TaskManager
任务提交
JobManager 通过 RPC 调用 TaskManager 的
submitTask
接口,传输包含:任务执行逻辑
配置参数
依赖信息
3. TaskManager 内部执行
任务线程生命周期
启动专用线程执行任务
调用核心方法
AbstractInvokable.invoke()
,按顺序执行:初始化 (initialize)
创建状态后端
恢复 Checkpoint 状态
调用
RichFunction.open()
(如建立数据库连接)
运行 (run)
Source Task:循环读取 Kafka/Pulsar 等数据并下发下游
Stream Task(中间算子):处理上游数据并转发下游
Sink Task:将数据写入外部系统(数据库/文件系统)
关闭 (close)
清理资源连接
调用
RichFunction.close()
至此数据开始在 TaskManager 间流动,Flink 作业进入持续计算状态
TM向JM连接的源码分析
这个过程由 ResourceManager (RM) 作为中间协调者。是 TaskManager (TM) 在收到 ResourceManager 的指令后,主动向 JobManager (JM) 发起连接和注册。 而不是 JM 主动去发现并连接 TM。
下面我们结合源码和 Flink 的工作机制来详细分解这个流程:
连接建立的详细步骤
TM 启动并向 RM 注册 当一个 TM 节点启动时,它的首要任务是找到并向 ResourceManager (RM) 注册自己。这相当于 TM 在向集群的资源管理者报告:“我上线了,我有这些资源(Slots)可供使用”。
在
TaskExecutor.java
的onStart()
方法中,我们可以看到这个过程的起点:// ... existing code ... @Override public void onStart() throws Exception {try {startTaskExecutorServices();} catch (Throwable t) { // ... existing code ...}startRegistrationTimeout(); }private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// tell the task slot table who's responsible for the task slot actionstaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// start the job leader servicejobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); // ... existing code ...
这里的
resourceManagerLeaderRetriever.start(...)
就是 TM 寻找 RM 的过程。一旦找到 RM,TM 就会将自己的位置信息、资源ID和可用的 Slot 信息注册到 RM。JM 向 RM 请求资源 当用户提交一个 Job 到 JM 后,JM 会为这个 Job 的所有 Task 向 RM 请求计算资源(Slots)。
RM 分配资源并通知 TM RM 收到 JM 的请求后,会从它管理的、已经注册的 TM 中寻找符合要求的空闲 Slot。找到后,RM 会将这个 Slot 分配给 JM。同时,RM 会向持有该 Slot 的 TM 发送一个指令(例如
offerSlots
RPC 调用),告诉它:“你的某个 Slot 现在被分配给了 JobID 为 X 的 Job,这个 Job 的 JobManager 在 Y 地址”。TM 主动连接并注册到 JM 这是最关键的一步。TM 在收到 RM 的指令后,才得知它需要为哪个 JM 工作以及 JM 的地址。这时,TM 内部的
JobLeaderService
会为这个特定的jobId
启动一个到目标 JM 的连接和注册过程。DefaultJobLeaderService.java
中的代码片段展示了 TM 是如何准备注册信息的:// ... existing code ... protected RetryingRegistration<JobMasterId,JobMasterGateway,JMTMRegistrationSuccess,JMTMRegistrationRejection>generateRegistration() {return new DefaultJobLeaderService.JobManagerRetryingRegistration(LOG,rpcService,"JobManager",JobMasterGateway.class,getTargetAddress(),getTargetLeaderId(),retryingRegistrationConfiguration,jobId,TaskManagerRegistrationInformation.create(ownerAddress, ownLocation, taskManagerSession)); } // ... existing code ...
TM 会构建一个
TaskManagerRegistrationInformation
对象,然后通过 RPC 调用 JM 的registerTaskManager
方法。当注册成功后,TM 端的
JobLeaderListenerImpl
会被回调,执行后续的连接建立工作。// ... existing code ... private final class JobLeaderListenerImpl implements JobLeaderListener {@Overridepublic void jobManagerGainedLeadership(final JobID jobId,final JobMasterGateway jobManagerGateway,final JMTMRegistrationSuccess registrationMessage) {runAsync(() ->jobTable.getJob(jobId).ifPresent(job ->establishJobManagerConnection(job,jobManagerGateway,registrationMessage)));} // ... existing code ...
jobManagerGainedLeadership
这个名字意味着 TM 已经成功地与 JM 领导者建立了联系。JMTMRegistrationSuccess
这个参数就是注册成功的凭证。之后establishJobManagerConnection
方法会建立心跳监控等。JM 向 TM 提交任务 一旦 TM 在 JM 处注册成功,它们之间的连接就完全建立好了。此时,JM 就可以通过这个连接向 TM 发送
submitTask
请求,将具体的计算任务部署到 TM 的 Slot 中去执行了。// ... existing code ... @Override public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Duration timeout) {final JobID jobId = tdd.getJobId();// todo: consider adding task infotry (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();final JobTable.Connection jobManagerConnection =jobTable.getConnection(jobId).orElseThrow(() -> {final String message ="Could not submit task because there is no JobManager "+ "associated for the job "+ jobId+ '.';log.debug(message); // ... existing code ...
submitTask
方法首先会检查与对应jobId
的 JM 连接是否存在 (jobTable.getConnection(jobId)
), 这也反向证明了连接必须预先建立好。
总结
- 启动顺序:TM 和 JM 都可以独立启动。
- 连接发起方:TM 是主动方。
- 协调者:ResourceManager 在整个过程中扮演了“介绍人”的角色。它告诉 TM 应该去连接哪个 JM。
所以,整个流程可以概括为:TM 向 RM "报到",JM 向 RM "要人",RM "指派" TM 去为 JM 工作,最后 TM 主动去向 JM "报到"(注册)。
Flink 与 YARN 集成
即使使用了 YARN,Flink 自己的 ResourceManager (RM) 依然是必需的,但它的角色和工作方式会发生根本性的变化。
为什么使用 YARN 还需要 Flink 的 ResourceManager?
这是因为 Flink 的 RM 和 YARN 的 RM 处在不同的管理层级,职责也不同:
YARN ResourceManager:这是整个 Hadoop 集群的全局资源管理器。它管理集群中所有节点(NodeManager)的 CPU、内存等资源。它的任务是为各种应用(如 Flink、Spark、MapReduce)分配资源单元,这个单元叫做“容器”(Container)。YARN RM 并不知道 Flink 的“Slot”是什么,它只负责分配和调度容器。
Flink ResourceManager:这是 Flink 集群专属的资源管理器。当以 Flink on YARN 模式运行时,它会作为 ApplicationMaster (AM) 在 YARN 分配的第一个容器中启动。它的核心职责是:
- 作为桥梁:代表 Flink 应用,向 YARN RM 申请/释放容器。
- 管理 Flink 资源:当从 YARN RM 获取到容器后,它会在这些容器中启动 Flink 的 TaskManager (TM) 进程。
- 维护 Slot:管理所有 TM 上的 Slot 状态,并将可用的 Slot 信息提供给 JobManager (JM),用于执行任务。
所以,你可以这样理解它们的关系:Flink RM 向 YARN RM “要地”(申请容器),然后在这些“地”上“盖房子”(启动 TM),并管理这些“房子”里的“房间”(Slots)。
ResourceManager.java
文件正是 Flink RM 的核心抽象类定义,这证明了 Flink RM 是 Flink 运行时的一个独立且关键的组件。
// ... existing code ...
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>extends FencedRpcEndpoint<ResourceManagerId>implements DelegationTokenManager.Listener, ResourceManagerGateway {
// ... existing code ...
引入 YARN 到底发生了什么变化?
引入 YARN 最大的变化是 Flink 实现了资源管理的动态化和自动化。
- Standalone 模式(没有 YARN):你需要手动启动固定数量的 TaskManager。资源是静态的,有多少 TM 和 Slot 在集群启动时就确定了,不会自动增减。
- YARN 模式:
- 动态资源分配:Flink RM 可以根据作业的实际需求(例如,作业并行度增加了),动态地向 YARN RM 申请更多的容器来启动新的 TM。当资源空闲时,也可以将容器释放回 YARN,供其他应用使用。这大大提高了集群的资源利用率。
- 进程管理外包:TM 的生命周期由 Flink RM 和 YARN NodeManager 管理。如果一个 TM 挂了,Flink RM 会向 YARN RM 重新申请一个容器来替换它。如果 Flink RM (ApplicationMaster) 挂了,YARN RM 会负责重启它,从而实现了一定程度的自动容错。
在 flink-yarn
模块中,YarnResourceManagerFactory
就是专门用于在 YARN 环境下创建 Flink RM 实例的工厂类,它创建的 RM 具备了与 YARN 交互的能力。
// ... existing code ...
public class YarnResourceManagerFactory extends ActiveResourceManagerFactory<YarnWorkerNode> {private static final YarnResourceManagerFactory INSTANCE = new YarnResourceManagerFactory();private YarnResourceManagerFactory() {}// ... existing code ...
Flink 是如何与 YARN 集成的?
Flink 没有 复制 YARN 的源码。它是通过标准的 YARN 客户端 API 来与 YARN 集群进行通信的,这是一种标准的、解耦的集成方式。
整个集成过程大致如下:
- 你在客户端执行
flink run -t yarn-session ...
或flink run-application ...
命令。 - Flink 的 YARN 客户端会将 Flink 的 Jar 包(包括 ApplicationMaster 的代码)和作业 Jar 包上传到 HDFS。
- YARN 客户端向 YARN RM 提交一个应用创建请求。
- YARN RM 收到请求后,会在某个 NodeManager 上分配一个容器,并在这个容器里启动 Flink 的 ApplicationMaster(AM)进程(里面就运行着 Flink RM)。
- 启动后的 Flink AM (Flink RM) 会使用 YARN 提供的
AMRMClientAsync
客户端与 YARN RM 建立心跳和通信,根据需要申请更多的容器来运行 TaskManager。 - 当 YARN RM 批准请求并分配了新的容器后,Flink AM 会使用
NMClientAsync
客户端与对应的 NodeManager 通信,在新的容器中启动 TaskManager 进程。
YarnResourceManagerClientFactory.java
这个文件就明确展示了 Flink 是如何创建 YARN RM 客户端的,其中 AMRMClientAsync
就是 YARN 提供的标准客户端接口。
// ... existing code ...
public interface YarnResourceManagerClientFactory {/*** Create YARN ResourceManager client with the given callback handler.** @param yarnHeartbeatIntervalMillis heartbeat interval between the client and the YARN* ResourceManager.* @param callbackHandler which handles the events from YARN ResourceManager.* @return an {@link AMRMClientAsync} instance.*/AMRMClientAsync<AMRMClient.ContainerRequest> createResourceManagerClient(int yarnHeartbeatIntervalMillis,AMRMClientAsync.AbstractCallbackHandler callbackHandler);
}
这种方式使得 Flink 可以很好地兼容不同版本的 Hadoop YARN,而不需要关心 YARN 的内部实现细节。
Standalone和Yarn
TM 进程的来源以及 RM 的角色:
- Standalone 模式(原始模式):TM 是由用户手动启动的。RM 在这里扮演一个被动的“登记中心”角色,它只是等待 TM 来注册并上报资源。
- YARN 模式:TM 是由 Flink RM 通过 YARN 动态申请并创建的。RM 在这里扮演一个主动的“资源调度者”角色,它负责向 YARN 申请资源并拉起 TM。
下面我们深入源码来看这两种模式的具体流程。
Standalone 模式:被动等待与注册
在这种模式下,Flink RM 完全不关心 TM 进程是怎么来的。
TM 的产生: 由运维人员或启动脚本在集群的各个节点上,手动执行 bin/taskmanager.sh start
来启动一个或多个 TaskManager 进程。
RM 如何知道 Slot:
TM 主动注册:每个手动启动的 TM,在初始化后会根据
flink-conf.yaml
中的jobmanager.rpc.address
(在 Flink 1.x 中,现在是rest.address
) 找到 RM 的地址,然后调用 RM 的registerTaskExecutor
RPC 方法来注册自己。上报 SlotReport:在注册请求中,TM 会附带一个
SlotReport
对象。这个对象详细描述了该 TM 拥有多少个 Slot,以及每个 Slot 的资源规格(ResourceProfile)。RM 登记资源:RM 的
registerTaskExecutor
方法接收到请求后,会做几件事:- 验证这个 TM 是否合法。
- 将这个 TM 的连接信息(
TaskExecutorGateway
)和其 Worker 信息记录在taskExecutors
这个 Map 中。 - 最关键的,它会将
SlotReport
交给内部的SlotManager
组件。 SlotManager
会解析SlotReport
,将这些新的 Slot 加入到自己的可用资源池中,等待 JobManager 来申请。
StandaloneResourceManager.java
是此模式下的具体实现。你会发现它的 initialize
方法非常简单,因为它不需要做任何主动申请资源的操作,只是启动内部服务,然后“坐等”TM 来连接。
// ... existing code ...@Overrideprotected void initialize() throws ResourceManagerException {// 启动一个启动周期,在这个周期内,它会等待TMs注册。// 它不会主动做任何事情来创建TM。startStartupPeriod();}@Overrideprotected void terminate() {// noop}@Overrideprotected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) {}
// ... existing code ...
YARN 模式:主动申请与创建
在 YARN 模式下,情况完全反转,RM 变得非常“主动”。这里的 RM 是 ActiveResourceManager
的一个实例。
TM 的产生:
JM 申请 Slot:一个 Job 提交后,JobManager 会根据作业的并行度向 Flink RM 申请所需数量的 Slot。
SlotManager 发现资源不足:Flink RM 内部的
SlotManager
收到请求,发现当前已注册的 TM 上的空闲 Slot 不足以满足需求。RM 主动申请资源:
SlotManager
会通过ResourceEventListener
回调通知ActiveResourceManager
:“我需要更多资源!”。ActiveResourceManager
不会等待,而是会立即行动。它通过一个叫做ResourceManagerDriver
的组件(在 YARN 模式下,具体实现是YarnResourceManagerDriver
)来和外部资源系统(YARN)交互。它会调用resourceManagerDriver.requestResource(...)
方法。YARN 分配容器:
YarnResourceManagerDriver
会使用 YARN 的客户端 API 向 YARN RM 发送一个容器申请。YARN RM 会在某个 NodeManager 上分配一个容器。RM 在容器中启动 TM:当
YarnResourceManagerDriver
收到 YARN RM 分配容器的通知后,它会连接到对应的 NodeManager,并命令它在刚刚分配的容器中执行一个启动命令,这个命令会拉起一个TaskExecutor
进程。
ActiveResourceManager.java
是实现这一逻辑的核心。它的 initialize
方法会启动 ResourceManagerDriver
,为与外部系统通信做好准备。
// ... existing code ...@Overrideprotected void initialize() throws ResourceManagerException {try {// 初始化 ResourceManagerDriver,它封装了与YARN/K8s等外部系统交互的逻辑resourceManagerDriver.initialize(this,new GatewayMainThreadExecutor(),ioExecutor,blocklistHandler::getAllBlockedNodeIds);} catch (Exception e) {throw new ResourceManagerException("Cannot initialize resource provider.", e);}}
// ... existing code ...
当需要新 worker 时,ActiveResourceManager
会调用 startNewWorker
,最终会调用到 resourceManagerDriver.requestResource
。
RM 如何知道 Slot: 这一步和 Standalone 模式完全一样。 一旦 TM 进程被 YARN 的 NodeManager 在容器中启动,这个 TM 就会执行和 Standalone 模式下完全相同的初始化逻辑:找到 Flink RM (此时是 ApplicationMaster),然后调用 registerTaskExecutor
RPC 方法,并上报自己的 SlotReport
。
总结
特性 | Standalone 模式 | YARN 模式 |
---|---|---|
RM 角色 | 被动 (Passive) | 主动 (Active) |
TM 来源 | 用户手动启动 | Flink RM 通过 YARN 申请并创建 |
资源弹性 | 静态,资源固定 | 动态,按需伸缩 |
核心实现 | StandaloneResourceManager | ActiveResourceManager + YarnResourceManagerDriver |
所以,引入 YARN 的本质变化是将资源管理的职责从“用户手动操作”转移到了“程序自动协调”。Flink RM 从一个简单的“登记员”升级为了一个聪明的“项目经理”,能够主动地向 YARN 这个“资源供应商”申请和释放资源,实现了整个集群的弹性伸缩。