当前位置: 首页 > news >正文

16.TaskExecutor启动

TaskExecutor

  • TaskExecutor 是 Flink 中实际执行任务的组件,负责与 ResourceManager 通信、初始化自身资源并上报,接收客户端提交的 JobGraph 并进行执行。

    本文重点分析 TaskExecutor 的启动与初始化流程。

TaskManagerRunner

  • Standalone 模式下,TaskManagerRunner 作为 TaskExecutor 的启动入口,核心执行流程为 runTaskManagerProcessSecurely 方法,最终通过 runTaskManager 方法正式启动。

    public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {//初始化一个 taskManagertaskManagerRunner =new TaskManagerRunner(configuration,pluginManager,//TaskExecutorService适配器,可以理解在TaskExecutor和TaskManagerRunner之间的设配器TaskManagerRunner::createTaskExecutorService);//调用 taskManager.start 方法taskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}}public void start() throws Exception {synchronized (lock) {//这一步是初始化服务的前期服务,保活rpc通信等startTaskManagerRunnerServices();//这一步才是启动taskExecutortaskExecutorService.start();}}//核心方法:该方法完成了资源初始化与通信组件搭建
    private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {//导入pekkoRpcrpcSystem = RpcSystem.load(configuration);//线程池this.executor =Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("taskmanager-future"));//高可用highAvailabilityServices =HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,executor,AddressResolution.NO_ADDRESS_RESOLUTION,rpcSystem,this);//JMX的监控JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));//正式开启一个rpc通信rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);//获取该资源名称this.resourceId =getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());this.workingDirectory =ClusterEntrypointUtils.createTaskManagerWorkingDirectory(configuration, resourceId);LOG.info("Using working directory: {}", workingDirectory);//心跳服务HeartbeatServices heartbeatServices =HeartbeatServices.fromConfiguration(configuration);metricRegistry =new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration,rpcSystem.getMaximumMessageSizeInBytes(configuration)),ReporterSetup.fromConfiguration(configuration, pluginManager),TraceReporterSetup.fromConfiguration(configuration, pluginManager));final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration,rpcService.getAddress(),configuration.get(TaskManagerOptions.BIND_HOST),rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());blobCacheService =BlobUtils.createBlobCacheService(configuration,Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),highAvailabilityServices.createBlobStore(),null);final ExternalResourceInfoProvider externalResourceInfoProvider =ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(configuration, pluginManager);final DelegationTokenReceiverRepository delegationTokenReceiverRepository =new DelegationTokenReceiverRepository(configuration, pluginManager);//工厂类,正式创建一个 taskExecutorServicetaskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);handleUnexpectedTaskExecutorServiceTermination();MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture.thenAccept(ignored -> {}));}}
    

startTaskManagerRunnerServices

  • 该方法作为 TaskExecutor启动的核心方法:

    • RPC 服务搭建(基于 Pekko
    • 高可用机制、高性能线程池、心跳机制、指标监控等的初始化
    • 资源目录与 ID 配置
    • 创建并启动核心的 TaskExecutor
  • 理论上,可以从 createRpcService(configuration, highAvailabilityServices, rpcSystem) 进一步分析,但实质上该过程与 JobManager 类似,都是将组件作为 RpcEndpoint 挂载到 Pekko Actor 系统上,因此细节可以略过。

  • 由于 TaskExecutor 作为一个标准的 RpcEndpoint,在启动过程中也会回调其 onStart 方法,后续逻辑由该方法展开。

TaskExecutor

  • TaskExecutor 是 Flink 中专门负责执行任务的核心组件。由于继承自 RpcEndpoint,它本身具备远程通信能力。同时,TaskExecutor 不涉及 fencingToken 机制,因此在通信过程中无需关注“新旧”状态的问题(即无需处理领导权交替的影响)。

    启动流程从 TaskExecutoronStart 方法开始。下面就从其 onStart 方法入手,分析 TaskExecutor 的启动过程。

 @Overridepublic void onStart() throws Exception {try {//正式启动 taskExecutorstartTaskExecutorServices();} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout();}private void startTaskExecutorServices() throws Exception {try {// 启动 ResouceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// 初始化 taskSlot,需要报告给ResourceManagertaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// jobLeader,专门负责jobMasterjobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());// 4️⃣ 文件缓存fileCache =new FileCache(taskManagerConfiguration.getTmpDirectories(),taskExecutorBlobService.getPermanentBlobService());// 5️⃣ 加载本地 slot 分配快照tryLoadLocalAllocationSnapshots();} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}

总结:

  • TaskExecutor 启动过程中,核心的初始化步骤主要集中在以下三个组件的启动上:
    • resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener())
      启动 ResourceManager 领导者监听,用于动态发现 ResourceManager,并建立注册关系。
    • taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor())
      启动 Slot 管理与上报机制,负责本地资源槽的管理与状态同步。
    • jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl())
      启动 Job 领导者发现与注册机制,用于与各个 JobMaster 建立会话与通信。
  • 上述三部分构成了 TaskExecutor 启动过程的核心逻辑
  • 接下来的分析,将围绕这三大模块进一步拆解:TaskExecutorResourceManagerJobMaster 的交互机制
http://www.lryc.cn/news/594485.html

相关文章:

  • 基于pyside6的通用机器人遥控控制界面
  • Windows批量修改文件属性方法
  • Spring Boot 第一天知识汇总
  • 【51单片机仿真复位电阻电容参数】2022-5-17
  • IsaacLab学习记录(四)
  • Linux文件系统三要素:块划分、分区管理与inode结构解析
  • [CVPR]DVFL-Net:用于时空动作识别的轻量级蒸馏视频调焦网络
  • Python知识点2-if语句
  • FreeRTOS学习笔记之内存管理
  • Raz解决问题:You are offline.
  • [Linux]进程 / PID
  • 【开源项目】基于RuoYi-Vue-Plus的开源进销存管理系统
  • Spring Boot 配置文件解析
  • USB技术发展史:从1.0到USB4的演进之路
  • Matplotlib Contourf 标注字体详细设置
  • Spring之AOP面向切面编程详解
  • 【数据结构】双向循环链表的实现
  • MyBatis从浅入深
  • day24——Java高级技术深度解析:单元测试、反射、注解与动态代理
  • 高性能熔断限流实现:Spring Cloud Gateway 在电商系统的实战优化
  • `SearchTransportService` 是 **协调节点与数据节点之间“搜索子请求”通信的运输层**
  • 4种快速创建SpringBoot项目的方法
  • Claude Code 逆向工程分析,探索最新Agent设计
  • JavaScript 中Object、Array 和 String的常用方法
  • 金融工程、金融与经济学知识点
  • 数据结构与算法汇总
  • 连接语言大模型(LLM)服务进行对话
  • GaussDB select into和insert into的用法
  • 机器学习基础:从数据到智能的入门指南
  • python生成密钥