15.dispatcherRunner启动
dispatcherRunner
职责:
在 Flink
中,dispatcherRunner
主要负责作业提交、作业分发以及创建 JobMaster
。它还集成了高可用机制(HA
),保证在 failover
场景下作业可以恢复。
创建入口:
通过 DispatcherRunnerFactory.createDispatcherRunner()
构建。不同部署模式(如 standalone
、Yarn
、Kubernetes
)会用不同的 factory
实现(如 DefaultDispatcherRunnerFactory
)。
DefaultDispatcherRunnerFactory
作用:
专门用于生成 DefaultDispatcherRunner
,该类同时继承了 DispatcherRunner
和 LeaderContender
,也就是说它是个具备高可用选举能力的 dispatcher。
Leader 选举与启动:
leaderElection.startLeaderElection(this)
由start()
方法启动选举。grantLeadership(UUID leaderSessionID)
获得 leader 权限时触发,启动新的 DispatcherLeaderProcess。
这里内部会构建:SessionDispatcherLeaderProcess
(典型模式下)- 实际调用
AbstractDispatcherLeaderProcess
管理作业恢复、JobGraph 生成等。
源码
dispatcherRunner =dispatcherRunnerFactory.createDispatcherRunner(//standalone模式下,直接获得leader组件。就是给container赋默认uuid值highAvailabilityServices.getDispatcherLeaderElection(),fatalErrorHandler,//高可用模式下作业的恢复。等后续高可用模式并且作业提交的时候,可以看下。目前启动用不上。new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),ioExecutor,//需要rpc通信rpcService,partialDispatcherServices);
dispatcherRunner 的运行机制
- 和
resourceManager
类似:- 都作为集群启动的核心组件。
- 都具备高可用选举机制。
- 获得
leader
后启动内部流程(如资源管理、作业调度等)。
- 但区别是:
- dispatcher 专注于作业管理(提交、恢复、分发到
jobMaster
)。 - resourceManager 负责资源调度(
Slot
管理、TaskExecutor
注册等)。
- dispatcher 专注于作业管理(提交、恢复、分发到
入口源码
@Overridepublic DispatcherRunner createDispatcherRunner(LeaderElection leaderElection,FatalErrorHandler fatalErrorHandler,JobPersistenceComponentFactory jobPersistenceComponentFactory,Executor ioExecutor,RpcService rpcService,PartialDispatcherServices partialDispatcherServices)throws Exception {final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory(jobPersistenceComponentFactory,ioExecutor,rpcService,partialDispatcherServices,fatalErrorHandler);return DefaultDispatcherRunner.create(leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);}@Overridepublic void grantLeadership(UUID leaderSessionID) {runActionIfRunning(() -> {LOG.info("{} was granted leadership with leader id {}. Creating new {}.",getClass().getSimpleName(),leaderSessionID,DispatcherLeaderProcess.class.getSimpleName());//这里就和前面的resourceManager一致。再提一下,主要启动 SessionDispatcherLeaderProcess。因为dispatcher还包含了作业恢复等操作,所以还有一个AbstractDispatcherLeaderProcess用于dispatcherLeader的启动。这个类启动之前会先启动对应的jobGraph等startNewDispatcherLeaderProcess(leaderSessionID);});}
总结
目前,JobManager
相关的组件中,已分析了 ResourceManager
、Dispatcher
、Metrics
等核心部分。剩余未讲解的包括 MetricsReport
、BlobServer
、HaServices
和 JobMaster
等组件。以下是这部分的简要总结与梳理:
MetricsReport
- 作用:
负责将集群和作业的各类指标(如 CPU 使用率、内存占用、Task 运行状态等)主动上报到外部系统(如 Prometheus、JMX)。
与 Dispatcher 和 ResourceManager 内部的 Metrics 模块不同,MetricsReport 属于指标数据的输出侧,通过配置的 reporter 实现对外发布。 - 特点:
- 内部采集:通过 JMX Bean、系统监控等采集 CPU、内存等信息。
- 对外上报:采用定时主动推送机制,而非 RPC 被动查询(与 Akka 交互的内部监控不同)。
BlobServer
- 作用:
提供作业资源的分发与缓存。Flink 作业提交过程中所依赖的 JAR 包、配置文件等资源会上传到 BlobServer,并存储在本地目录下,供 TaskExecutor 下载使用。 - 本质:
就是一个简化版的文件服务,为作业资源提供上传、下载、缓存能力。
HaServices
- 作用:
提供高可用服务,包括:- Leader 选举(如 Dispatcher、ResourceManager 等的 HA 切换)。
- 心跳管理(与 TaskExecutor 保持连接与状态监控)。
- JobGraph 持久化(用于作业恢复)。
- 说明:
虽然 HaServices 是在 JobManager 中初始化的,但它不仅服务于 JobManager 组件本身,也用于与 TaskExecutor 保持集群通信及作业状态一致性。 - 后续:
HaServices 更适合在 TaskExecutor 相关部分展开细讲,例如如何参与心跳机制及失联检测。
JobMaster
- 作用:
JobMaster 是 Flink 中每个作业的核心调度器,由 Dispatcher 在作业提交时动态创建。
负责:- 管理作业生命周期;
- 调度任务到各 TaskExecutor;
- 监控任务运行状态;
- 汇报作业进度及失败信息。
- 关键点:
- 一个作业对应一个独立的 JobMaster 实例;
- 与 Dispatcher、ResourceManager 通过 RPC 保持通信;
- 管理 Slot 分配与 Task 启动,调度逻辑完全在 JobMaster 内部控制。
总结整理(模块职责)
组件 | 作用 | 说明 |
---|---|---|
MetricsReport | 指标采集与对外推送 | 主动推送(如 Prometheus、JMX) |
BlobServer | 作业资源分发与缓存 | 本地文件服务,供 TaskExecutor 下载 |
HaServices | 高可用与通信服务 | 提供选举、心跳、作业持久化支持 |
JobMaster | 作业调度与管理 | 每个作业一个,负责调度与状态管理 |
补充:Future 阻塞机制
Flink
框架整体采用 CompletableFuture
来管理异步线程和流程控制。不同于传统基于 NIO
或 IO
构建的服务端程序(通常在主线程中依靠一个无限循环来保持服务常驻),Flink
并不依赖 while(true)
结构维持主流程。
returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
这里的 .get()
方法会阻塞当前主线程,直到整个集群终止(正常关闭或发生致命异常)。这使得主线程可以简洁地等待集群结束,而不需要显式的死循环来维持进程存活。
以下是关键源码片段:
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();try {clusterEntrypoint.startCluster();} catch (ClusterEntrypointException e) {LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),e);System.exit(STARTUP_FAILURE_RETURN_CODE);}int returnCode;Throwable throwable = null;try {//这一步保证服务端不退出returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();} catch (Throwable e) {throwable = ExceptionUtils.stripExecutionException(e);returnCode = RUNTIME_FAILURE_RETURN_CODE;}LOG.info("Terminating cluster entrypoint process {} with exit code {}.",clusterEntrypointName,returnCode,throwable);System.exit(returnCode);}