16.TaskExecutor启动
TaskExecutor
-
TaskExecutor
是 Flink 中实际执行任务的组件,负责与ResourceManager
通信、初始化自身资源并上报,接收客户端提交的JobGraph
并进行执行。本文重点分析
TaskExecutor
的启动与初始化流程。
TaskManagerRunner
-
在
Standalone
模式下,TaskManagerRunner
作为TaskExecutor
的启动入口,核心执行流程为runTaskManagerProcessSecurely
方法,最终通过runTaskManager
方法正式启动。public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {//初始化一个 taskManagertaskManagerRunner =new TaskManagerRunner(configuration,pluginManager,//TaskExecutorService适配器,可以理解在TaskExecutor和TaskManagerRunner之间的设配器TaskManagerRunner::createTaskExecutorService);//调用 taskManager.start 方法taskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}}public void start() throws Exception {synchronized (lock) {//这一步是初始化服务的前期服务,保活rpc通信等startTaskManagerRunnerServices();//这一步才是启动taskExecutortaskExecutorService.start();}}//核心方法:该方法完成了资源初始化与通信组件搭建 private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {//导入pekkoRpcrpcSystem = RpcSystem.load(configuration);//线程池this.executor =Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("taskmanager-future"));//高可用highAvailabilityServices =HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,executor,AddressResolution.NO_ADDRESS_RESOLUTION,rpcSystem,this);//JMX的监控JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));//正式开启一个rpc通信rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);//获取该资源名称this.resourceId =getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());this.workingDirectory =ClusterEntrypointUtils.createTaskManagerWorkingDirectory(configuration, resourceId);LOG.info("Using working directory: {}", workingDirectory);//心跳服务HeartbeatServices heartbeatServices =HeartbeatServices.fromConfiguration(configuration);metricRegistry =new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration,rpcSystem.getMaximumMessageSizeInBytes(configuration)),ReporterSetup.fromConfiguration(configuration, pluginManager),TraceReporterSetup.fromConfiguration(configuration, pluginManager));final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration,rpcService.getAddress(),configuration.get(TaskManagerOptions.BIND_HOST),rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());blobCacheService =BlobUtils.createBlobCacheService(configuration,Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),highAvailabilityServices.createBlobStore(),null);final ExternalResourceInfoProvider externalResourceInfoProvider =ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(configuration, pluginManager);final DelegationTokenReceiverRepository delegationTokenReceiverRepository =new DelegationTokenReceiverRepository(configuration, pluginManager);//工厂类,正式创建一个 taskExecutorServicetaskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);handleUnexpectedTaskExecutorServiceTermination();MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture.thenAccept(ignored -> {}));}}
startTaskManagerRunnerServices
-
该方法作为
TaskExecutor
启动的核心方法:RPC
服务搭建(基于Pekko
)- 高可用机制、高性能线程池、心跳机制、指标监控等的初始化
- 资源目录与
ID
配置 - 创建并启动核心的
TaskExecutor
-
理论上,可以从
createRpcService(configuration, highAvailabilityServices, rpcSystem)
进一步分析,但实质上该过程与JobManager
类似,都是将组件作为RpcEndpoint
挂载到 Pekko Actor 系统上,因此细节可以略过。 -
由于
TaskExecutor
作为一个标准的RpcEndpoint
,在启动过程中也会回调其onStart
方法,后续逻辑由该方法展开。
TaskExecutor
-
TaskExecutor
是 Flink 中专门负责执行任务的核心组件。由于继承自RpcEndpoint
,它本身具备远程通信能力。同时,TaskExecutor
不涉及fencingToken
机制,因此在通信过程中无需关注“新旧”状态的问题(即无需处理领导权交替的影响)。启动流程从
TaskExecutor
的onStart
方法开始。下面就从其onStart
方法入手,分析TaskExecutor
的启动过程。
@Overridepublic void onStart() throws Exception {try {//正式启动 taskExecutorstartTaskExecutorServices();} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout();}private void startTaskExecutorServices() throws Exception {try {// 启动 ResouceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// 初始化 taskSlot,需要报告给ResourceManagertaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// jobLeader,专门负责jobMasterjobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());// 4️⃣ 文件缓存fileCache =new FileCache(taskManagerConfiguration.getTmpDirectories(),taskExecutorBlobService.getPermanentBlobService());// 5️⃣ 加载本地 slot 分配快照tryLoadLocalAllocationSnapshots();} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}
总结:
TaskExecutor
启动过程中,核心的初始化步骤主要集中在以下三个组件的启动上:resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener())
启动ResourceManager
领导者监听,用于动态发现ResourceManager
,并建立注册关系。taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor())
启动Slot
管理与上报机制,负责本地资源槽的管理与状态同步。jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl())
启动Job
领导者发现与注册机制,用于与各个JobMaster
建立会话与通信。
- 上述三部分构成了
TaskExecutor
启动过程的核心逻辑。 - 接下来的分析,将围绕这三大模块进一步拆解:
TaskExecutor
与ResourceManager
及JobMaster
的交互机制。