当前位置: 首页 > news >正文

15.dispatcherRunner启动

dispatcherRunner

职责
Flink 中,dispatcherRunner 主要负责作业提交、作业分发以及创建 JobMaster。它还集成了高可用机制HA),保证在 failover 场景下作业可以恢复。

创建入口
通过 DispatcherRunnerFactory.createDispatcherRunner() 构建。不同部署模式(如 standaloneYarnKubernetes)会用不同的 factory 实现(如 DefaultDispatcherRunnerFactory)。

DefaultDispatcherRunnerFactory

作用
专门用于生成 DefaultDispatcherRunner,该类同时继承了 DispatcherRunnerLeaderContender,也就是说它是个具备高可用选举能力的 dispatcher。

Leader 选举与启动

  1. leaderElection.startLeaderElection(this)
    start() 方法启动选举。
  2. grantLeadership(UUID leaderSessionID)
    获得 leader 权限时触发,启动新的 DispatcherLeaderProcess。
    这里内部会构建:
    • SessionDispatcherLeaderProcess (典型模式下)
    • 实际调用 AbstractDispatcherLeaderProcess 管理作业恢复、JobGraph 生成等。

源码

dispatcherRunner =dispatcherRunnerFactory.createDispatcherRunner(//standalone模式下,直接获得leader组件。就是给container赋默认uuid值highAvailabilityServices.getDispatcherLeaderElection(),fatalErrorHandler,//高可用模式下作业的恢复。等后续高可用模式并且作业提交的时候,可以看下。目前启动用不上。new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),ioExecutor,//需要rpc通信rpcService,partialDispatcherServices);

dispatcherRunner 的运行机制

  • resourceManager 类似:
    • 都作为集群启动的核心组件
    • 都具备高可用选举机制。
    • 获得 leader 后启动内部流程(如资源管理、作业调度等)。
  • 但区别是:
    • dispatcher 专注于作业管理(提交、恢复、分发到 jobMaster)。
    • resourceManager 负责资源调度Slot 管理、TaskExecutor 注册等)。

入口源码

@Overridepublic DispatcherRunner createDispatcherRunner(LeaderElection leaderElection,FatalErrorHandler fatalErrorHandler,JobPersistenceComponentFactory jobPersistenceComponentFactory,Executor ioExecutor,RpcService rpcService,PartialDispatcherServices partialDispatcherServices)throws Exception {final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory(jobPersistenceComponentFactory,ioExecutor,rpcService,partialDispatcherServices,fatalErrorHandler);return DefaultDispatcherRunner.create(leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);}@Overridepublic void grantLeadership(UUID leaderSessionID) {runActionIfRunning(() -> {LOG.info("{} was granted leadership with leader id {}. Creating new {}.",getClass().getSimpleName(),leaderSessionID,DispatcherLeaderProcess.class.getSimpleName());//这里就和前面的resourceManager一致。再提一下,主要启动 SessionDispatcherLeaderProcess。因为dispatcher还包含了作业恢复等操作,所以还有一个AbstractDispatcherLeaderProcess用于dispatcherLeader的启动。这个类启动之前会先启动对应的jobGraph等startNewDispatcherLeaderProcess(leaderSessionID);});}

总结

目前,JobManager 相关的组件中,已分析了 ResourceManagerDispatcherMetrics 等核心部分。剩余未讲解的包括 MetricsReportBlobServerHaServicesJobMaster 等组件。以下是这部分的简要总结与梳理:

MetricsReport

  • 作用
    负责将集群和作业的各类指标(如 CPU 使用率、内存占用、Task 运行状态等)主动上报到外部系统(如 Prometheus、JMX)。
    与 Dispatcher 和 ResourceManager 内部的 Metrics 模块不同,MetricsReport 属于指标数据的输出侧,通过配置的 reporter 实现对外发布。
  • 特点
    • 内部采集:通过 JMX Bean、系统监控等采集 CPU、内存等信息。
    • 对外上报:采用定时主动推送机制,而非 RPC 被动查询(与 Akka 交互的内部监控不同)。

BlobServer

  • 作用
    提供作业资源的分发与缓存。Flink 作业提交过程中所依赖的 JAR 包、配置文件等资源会上传到 BlobServer,并存储在本地目录下,供 TaskExecutor 下载使用。
  • 本质
    就是一个简化版的文件服务,为作业资源提供上传、下载、缓存能力。

HaServices

  • 作用
    提供高可用服务,包括:
    • Leader 选举(如 Dispatcher、ResourceManager 等的 HA 切换)。
    • 心跳管理(与 TaskExecutor 保持连接与状态监控)。
    • JobGraph 持久化(用于作业恢复)。
  • 说明
    虽然 HaServices 是在 JobManager 中初始化的,但它不仅服务于 JobManager 组件本身,也用于与 TaskExecutor 保持集群通信及作业状态一致性。
  • 后续
    HaServices 更适合在 TaskExecutor 相关部分展开细讲,例如如何参与心跳机制及失联检测。

JobMaster

  • 作用
    JobMaster 是 Flink 中每个作业的核心调度器,由 Dispatcher 在作业提交时动态创建。
    负责:
    • 管理作业生命周期;
    • 调度任务到各 TaskExecutor;
    • 监控任务运行状态;
    • 汇报作业进度及失败信息。
  • 关键点
    • 一个作业对应一个独立的 JobMaster 实例;
    • 与 Dispatcher、ResourceManager 通过 RPC 保持通信;
    • 管理 Slot 分配与 Task 启动,调度逻辑完全在 JobMaster 内部控制。

总结整理(模块职责)

组件作用说明
MetricsReport指标采集与对外推送主动推送(如 Prometheus、JMX)
BlobServer作业资源分发与缓存本地文件服务,供 TaskExecutor 下载
HaServices高可用与通信服务提供选举、心跳、作业持久化支持
JobMaster作业调度与管理每个作业一个,负责调度与状态管理

补充:Future 阻塞机制

Flink 框架整体采用 CompletableFuture 来管理异步线程和流程控制。不同于传统基于 NIOIO 构建的服务端程序(通常在主线程中依靠一个无限循环来保持服务常驻),Flink 并不依赖 while(true) 结构维持主流程。

returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();

这里的 .get() 方法会阻塞当前主线程,直到整个集群终止(正常关闭或发生致命异常)。这使得主线程可以简洁地等待集群结束,而不需要显式的死循环来维持进程存活。

以下是关键源码片段:

public static void  runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();try {clusterEntrypoint.startCluster();} catch (ClusterEntrypointException e) {LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),e);System.exit(STARTUP_FAILURE_RETURN_CODE);}int returnCode;Throwable throwable = null;try {//这一步保证服务端不退出returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();} catch (Throwable e) {throwable = ExceptionUtils.stripExecutionException(e);returnCode = RUNTIME_FAILURE_RETURN_CODE;}LOG.info("Terminating cluster entrypoint process {} with exit code {}.",clusterEntrypointName,returnCode,throwable);System.exit(returnCode);}
http://www.lryc.cn/news/589914.html

相关文章:

  • 上线!《指标 + AI 数智应用白皮书》聚焦智能问数与分析等AI高阶应用,深入剖析四大行业落地实践
  • Python数据分析全流程指南:从数据采集到可视化呈现的实战解析
  • 修改pipenv默认安装路径
  • 李宏毅2025《机器学习》第七讲-推理模型:从原理、流派到未来挑战
  • Python面向对象编程(OOP)详解:通俗易懂的全面指南
  • C++面向对象创建打印算术表达式树
  • IIS-网站报500.19错误代码0x8007000d问题解决
  • 代码随想录算法训练营十七天|二叉树part07
  • LeafletJS 入门:构建你的第一个交互式地图
  • 【无标题】LighthouseGS:面向全景式移动拍摄的室内结构感知三维高斯泼溅
  • Day36 Java方法和流程控制练习 计算器
  • 微软AutoGen:多智能体协作的工业级解决方案
  • ESP32——快速入门
  • 外接硬盘写入速度很慢?Windows 写入缓存功能开启教程!
  • 知识点3:python-sdk 核心概念(prompt、image、context)
  • 项目学习笔记 display从none切换成block
  • 尚庭公寓-------图片上传接口
  • 2025年工会考试题库及答案
  • alpineLinux修改包管理为国内源
  • 详解SPFA算法-单源最短路径求解
  • 陆面、生态、水文模拟与多源遥感数据同化的实践技术应用
  • 【图灵完备】算数运算
  • sktime - 时间序列机器学习统一接口
  • 控制Vue对话框显示隐藏
  • C++设计模式之创建型模式
  • 【机器学习】数据理解:数据导入、数据审查与数据可视化
  • 数据降维方法:PCA
  • 集训Day02笔记总结(关于一些OJ题目的)
  • 第四章 OB SQL调优
  • Taro.eventCenter 用法详解与实战