当前位置: 首页 > news >正文

19.TaskExecutor与ResourceManager建立连接

19.TaskExecutor与ResourceManager建立连接

  • 在启动过程中,TaskExecutor 首先会从 ResourceManagerLeaderRetriever 获取到当前 ResourceManagerGateway(这是与 ResourceManager 通信的代理对象)

TaskExecutor向resourceManager 整体流程

  • TaskExecutorResourceManager 的注册不是一次性的,而是基于 RetryingRegistration 组件进行的,它负责:
    • 发起注册请求(invokeRegistration 方法)
    • 接收注册结果
    • 根据结果决定:完成注册 or 失败重试
/*** This method performs a registration attempt and triggers either a success notification or a* retry, depending on the result.*/@SuppressWarnings("unchecked")private void register(final G gateway, final int attempt, final long timeoutMillis) {// eager check for canceling to avoid some unnecessary workif (canceled) {return;}try {log.debug("Registration at {} attempt {} (timeout={}ms)",targetName,attempt,timeoutMillis);//该方法是通过 获取的 resourceManagerGateway调用CompletableFuture<RegistrationResponse> registrationFuture =invokeRegistration(gateway, fencingToken, timeoutMillis);// if the registration was successful, let the TaskExecutor know//这里CompletableFuture<Void> registrationAcceptFuture =registrationFuture.thenAcceptAsync((RegistrationResponse result) -> {if (!isCanceled()) {if (result instanceof RegistrationResponse.Success) {log.debug("Registration with {} at {} was successful.",targetName,targetAddress);S success = (S) result;//这个 completionFuture  就是Flink与resourceManager交互的 newRegistration.getFuture(); //这个异步直接完成,把resourceManagerGateWaye 封装返回completionFuture.complete(RetryingRegistrationResult.success(gateway, success));} else if (result instanceof RegistrationResponse.Rejection) {log.debug("Registration with {} at {} was rejected.",targetName,targetAddress);R rejection = (R) result;completionFuture.complete(RetryingRegistrationResult.rejection(rejection));} else {// registration failureif (result instanceof RegistrationResponse.Failure) {RegistrationResponse.Failure failure =(RegistrationResponse.Failure) result;log.info("Registration failure at {} occurred.",targetName,failure.getReason());} else {log.error("Received unknown response to registration attempt: {}",result);}log.info("Pausing and re-attempting registration in {} ms",retryingRegistrationConfiguration.getRefusedDelayMillis());registerLater(gateway,1,retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(),retryingRegistrationConfiguration.getRefusedDelayMillis());}}},rpcService.getScheduledExecutor());// upon failure, retryregistrationAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !isCanceled()) {if (ExceptionUtils.stripCompletionException(failure)instanceof TimeoutException) {// we simply have not received a response in time. maybe the timeout// was// very low (initial fast registration attempts), maybe the target// endpoint is// currently down.if (log.isDebugEnabled()) {log.debug("Registration at {} ({}) attempt {} timed out after {} ms",targetName,targetAddress,attempt,timeoutMillis);}long newTimeoutMillis =Math.min(2 * timeoutMillis,retryingRegistrationConfiguration.getMaxRegistrationTimeoutMillis());register(gateway, attempt + 1, newTimeoutMillis);} else {// a serious failure occurred. we still should not give up, but keep// tryinglog.error("Registration at {} failed due to an error",targetName,failure);log.info("Pausing and re-attempting registration in {} ms",retryingRegistrationConfiguration.getErrorDelayMillis());registerLater(gateway,1,retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(),retryingRegistrationConfiguration.getErrorDelayMillis());}}},rpcService.getScheduledExecutor());} catch (Throwable t) {completionFuture.completeExceptionally(t);cancel();}}

核心注册流程

一、TaskExecutor 向 ResourceManager 注册
CompletableFuture<RegistrationResponse> registrationFuture =invokeRegistration(gateway, fencingToken, timeoutMillis);

invokeRegistration 方法实质上调用了:

@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManager,ResourceManagerId fencingToken,long timeoutMillis)throws Exception {Time timeout = Time.milliseconds(timeoutMillis);return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
}
  • 即:通过 ResourceManager 的网关对象,调用其远程方法 registerTaskExecutor(),将 TaskExecutor 的注册信息发送过去。

  • taskExecutorRegistration 对象封装了 TaskExecutor 自身的资源信息(地址、端口、资源配置等)。

    public TaskExecutorRegistration(final String taskExecutorAddress,final ResourceID resourceId,final int dataPort,final int jmxPort,final HardwareDescription hardwareDescription,final TaskExecutorMemoryConfiguration memoryConfiguration,final ResourceProfile defaultSlotResourceProfile,final ResourceProfile totalResourceProfile,final String nodeId) {//地址this.taskExecutorAddress = checkNotNull(taskExecutorAddress);//资源编号this.resourceId = checkNotNull(resourceId);//端口号this.dataPort = dataPort;this.jmxPort = jmxPort;this.hardwareDescription = checkNotNull(hardwareDescription);this.memoryConfiguration = checkNotNull(memoryConfiguration);this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile);this.totalResourceProfile = checkNotNull(totalResourceProfile);this.nodeId = checkNotNull(nodeId);}
    
二、ResourceManager 处理注册请求
  1. 建立 TaskExecutor 网关连接
getRpcService().connect(taskExecutorRegistration.getTaskExecutorAddress(), TaskExecutorGateway.class)
  • ResourceManager 通过 TaskExecutor 的地址建立一个到 TaskExecutor 的远程网关连接(TaskExecutorGateway)。
  1. 防重处理
  • 防止重复注册或失效注册:
if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(resourceId))
  • 保证是最新的连接请求
  1. 执行注册内部逻辑
registerTaskExecutorInternal(taskExecutorGateway, taskExecutorRegistration)
 @Overridepublic CompletableFuture<RegistrationResponse> registerTaskExecutor(final TaskExecutorRegistration taskExecutorRegistration, final Time timeout) {CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture =//获取对应的 taskExecutor 地址并通信生成taskExecutor 网关getRpcService().connect(taskExecutorRegistration.getTaskExecutorAddress(),TaskExecutorGateway.class);//将这个future 放入 taskExecutorGatewayFutures 。防止重复注册taskExecutorGatewayFutures.put(taskExecutorRegistration.getResourceId(), taskExecutorGatewayFuture);return taskExecutorGatewayFuture.handleAsync((TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {final ResourceID resourceId = taskExecutorRegistration.getResourceId();//判断一下,防止连错if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(resourceId)) {//连接上了taskExecutorGatewayFutures.remove(resourceId);if (throwable != null) {return new RegistrationResponse.Failure(throwable);} else {//这里resourceManager进行注册return registerTaskExecutorInternal(taskExecutorGateway, taskExecutorRegistration);}} else {log.debug("Ignoring outdated TaskExecutorGateway connection for {}.",resourceId.getStringWithMetadata());return new RegistrationResponse.Failure(new FlinkException("Decline outdated task executor registration."));}},getMainThreadExecutor());}
三、registerTaskExecutorInternal 具体逻辑
/*** Registers a new TaskExecutor.** @param taskExecutorRegistration task executor registration parameters* @return RegistrationResponse*/private RegistrationResponse registerTaskExecutorInternal(TaskExecutorGateway taskExecutorGateway,TaskExecutorRegistration taskExecutorRegistration) {ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();WorkerRegistration<WorkerType> oldRegistration =taskExecutors.remove(taskExecutorResourceId);if (oldRegistration != null) {// TODO :: suggest old taskExecutor to stop itselflog.debug("Replacing old registration of TaskExecutor {}.",taskExecutorResourceId.getStringWithMetadata());// remove old task manager registration from slot managerslotManager.unregisterTaskManager(oldRegistration.getInstanceID(),new ResourceManagerException(String.format("TaskExecutor %s re-connected to the ResourceManager.",taskExecutorResourceId.getStringWithMetadata())));}final Optional<WorkerType> newWorkerOptional =getWorkerNodeIfAcceptRegistration(taskExecutorResourceId);String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();if (!newWorkerOptional.isPresent()) {log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did "+ "not recognize it",taskExecutorResourceId.getStringWithMetadata(),taskExecutorAddress);return new TaskExecutorRegistrationRejection("The ResourceManager does not recognize this TaskExecutor.");} else {WorkerType newWorker = newWorkerOptional.get();WorkerRegistration<WorkerType> registration =new WorkerRegistration<>(taskExecutorGateway,newWorker,taskExecutorRegistration.getDataPort(),taskExecutorRegistration.getJmxPort(),taskExecutorRegistration.getHardwareDescription(),taskExecutorRegistration.getMemoryConfiguration(),taskExecutorRegistration.getTotalResourceProfile(),taskExecutorRegistration.getDefaultSlotResourceProfile(),taskExecutorRegistration.getNodeId());log.info("Registering TaskManager with ResourceID {} ({}) at ResourceManager",taskExecutorResourceId.getStringWithMetadata(),taskExecutorAddress);//登记注册的 taskExecutorstaskExecutors.put(taskExecutorResourceId, registration);//登记心跳服务的taskExecutorstaskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new TaskExecutorHeartbeatSender(taskExecutorGateway));//返回给taskExecutor 注册成功return new TaskExecutorRegistrationSuccess(registration.getInstanceID(),resourceId,clusterInformation,latestTokens.get());}}
  • 重点关注:注册到心跳监控 = 注册到一个 ConcurrentHashMap

  • TaskExecutor 完成在 ResourceManager 的注册后,会进入心跳监控阶段,核心步骤:

    • HeartbeatManagerImpl.monitorTarget() 方法被调用。
    • 本质操作:
    heartbeatTargets.put(resourceID, heartbeatMonitor);
    

    这是一个:以 TaskExecutor的ResourceID 为 Key,以对应的心跳监控对象(HeartbeatMonitor) 为 Value,存储在 ConcurrentHashMap 中。

    private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
    

    源码:

        /** Map containing the heartbeat monitors associated with the respective resource ID. */private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;@Overridepublic void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {if (!stopped) {if (heartbeatTargets.containsKey(resourceID)) {log.debug("The target with resource ID {} is already been monitored.",resourceID.getStringWithMetadata());} else {HeartbeatMonitor<O> heartbeatMonitor =heartbeatMonitorFactory.createHeartbeatMonitor(resourceID,heartbeatTarget,mainThreadExecutor,heartbeatListener,heartbeatTimeoutIntervalMs,failedRpcRequestsUntilUnreachable);heartbeatTargets.put(resourceID, heartbeatMonitor);// check if we have stopped in the meantime (concurrent stop operation)if (stopped) {heartbeatMonitor.cancel();heartbeatTargets.remove(resourceID);}}}}
    
四、TaskExecutor 确认注册成功的关键步骤

当 ResourceManager 返回注册成功的响应后,TaskExecutor 会通过以下代码明确自身已成功注册:

completionFuture.complete(RetryingRegistrationResult.success(gateway, success)
);

这是整个异步注册流程中的重要标志,具体含义如下:

  • completionFuture 是 TaskExecutor 与 ResourceManager 注册流程的控制核心,表示注册操作的最终结果。
  • 当收到 RegistrationResponse.Success,即 ResourceManager 确认接收 TaskExecutor 后,调用该方法,将注册状态标记为成功。
  • 注册成功后,TaskExecutor 会继续执行:
    • 汇报自身的 Slot 资源;
    • 启动与 ResourceManager 的心跳服务,进入正式的资源管理与监控流程。
五、TaskExecutorToResourceManagerConnection的onRegistrationSuccess方法
  • 在前面讲过,TaskExecutorResourceManager 完成注册的“最终确认”依赖于 completionFuture.complete()。而注册完成后,整个注册链的回调流程由 TaskExecutorToResourceManagerConnection 控制,具体在 createNewRegistration() 方法中体现:

    private RetryingRegistration<F, G, S, R> createNewRegistration() {RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =newRegistration.getFuture();future.whenCompleteAsync((RetryingRegistration.RetryingRegistrationResult<G, S, R> result,Throwable failure) -> {if (failure != null) {if (failure instanceof CancellationException) {// we ignore cancellation exceptions because they originate from// cancelling// the RetryingRegistrationlog.debug("Retrying registration towards {} was cancelled.",targetAddress);} else {// this future should only ever fail if there is a bug, not if the// registration is declinedonRegistrationFailure(failure);}} else {if (result.isSuccess()) {//这一步targetGateway = result.getGateway();onRegistrationSuccess(result.getSuccess());} else if (result.isRejection()) {onRegistrationRejection(result.getRejection());} else {throw new IllegalArgumentException(String.format("Unknown retrying registration response: %s.", result));}}},executor);return newRegistration;}
    
    • **ResourceManagerRegistrationListener*的本质就是*监听ResourceManager连接状态的监听器,并在注册成功后,通过它调用了核心方法:

      private void establishResourceManagerConnection(ResourceManagerGateway resourceManagerGateway,ResourceID resourceManagerResourceId,InstanceID taskExecutorRegistrationId,ClusterInformation clusterInformation) {//汇报 Slot 信息final CompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(getResourceID(),taskExecutorRegistrationId,taskSlotTable.createSlotReport(getResourceID()),Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));slotReportResponseFuture.whenCompleteAsync((acknowledge, throwable) -> {if (throwable != null) {reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.",throwable));}},getMainThreadExecutor());// monitor the resource manager as heartbeat target//纳入心跳监控resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId,new ResourceManagerHeartbeatReceiver(resourceManagerGateway));// set the propagated blob server address设置 BlobServer 地址final InetSocketAddress blobServerAddress =new InetSocketAddress(clusterInformation.getBlobServerHostname(),clusterInformation.getBlobServerPort());taskExecutorBlobService.setBlobServerAddress(blobServerAddress);//记录正式连接establishedResourceManagerConnection =new EstablishedResourceManagerConnection(resourceManagerGateway,resourceManagerResourceId,taskExecutorRegistrationId);//停止注册超时任务stopRegistrationTimeout();}
      
六、停止注册超时任务
  • 总结:startRegistrationTimeout 是为了给注册过程加一个超时保护,防止无限等待;stopRegistrationTimeout 则在注册成功后,将超时任务失效,安全终结注册流程。
private void stopRegistrationTimeout() {currentRegistrationTimeoutId = null;
}
private void startRegistrationTimeout() {final Duration maxRegistrationDuration =taskManagerConfiguration.getMaxRegistrationDuration();if (maxRegistrationDuration != null) {final UUID newRegistrationTimeoutId = UUID.randomUUID();currentRegistrationTimeoutId = newRegistrationTimeoutId;scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);}}

总结

TaskExecutor 与 ResourceManager 的注册流程全貌

TaskExecutor 启动时,会与 ResourceManager 建立连接并完成注册,这一过程具备 异步 + 重试 + 心跳 等机制,确保系统稳定性与可靠性。整个流程可概括为以下几步:

  1. 通过 RetryingRegistration 发起注册(支持重试)
  • 使用 RetryingRegistration 构建注册流程,具备自动重试、超时处理等功能;
  • 通过 invokeRegistration() 方法向 ResourceManager 发送注册请求;
  • 成功后通过 completionFuture.complete(...) 通知上层注册完成。

  1. ResourceManager 接收注册请求并处理
  • 收到注册请求后,通过 RpcService.connect(...) 与 TaskExecutor 建立 RPC 通信;
  • 若注册合法,调用 registerTaskExecutorInternal() 完成内部登记:
    • 注册 TaskExecutor 信息;
    • 加入 SlotManager;
    • 启动心跳监控;
    • 返回 TaskExecutorRegistrationSuccess 响应。

  1. TaskExecutor 接收成功响应,进入工作状态
  • 收到 RegistrationResponse.Success 后,调用 completionFuture.complete(...) 标记注册成功;
  • 继续执行以下步骤:
    • 注册 Slot;
    • 启动向 ResourceManager 的心跳机制;
    • 完成与调度系统的对接,开始参与作业调度。
    • 取消之前设置的注册超时保护机制,防止误触发超时处理;
http://www.lryc.cn/news/594926.html

相关文章:

  • Openlayers 面试题及答案180道(161-180)
  • 线上问题排查之【CPU飙高100%】
  • 在幸狐RV1106板子上用gcc14.2本地编译安装mysql-8.0.42数据库
  • 一维DP深度解析
  • ElasticSearch是什么
  • 如何使用Ansible一键部署Nacos集群?
  • Android 蓝牙通讯全解析:从基础到实战
  • 【STM32】485接口原理
  • 元图 CAD:PDF 与 CAD 格式互转的完美解决方案
  • 部署 Zabbix 企业级分布式监控
  • WPF 初始界面启动时播放背景音乐
  • 合并pdf工具下载
  • Redis进阶--缓存
  • 如何使用python网络爬虫批量获取公共资源数据
  • 微软CEO Satya Nadella提出AI重构法则:从范式跃迁到社会盈余
  • 本地生活服务 app 同城信息发布系统搭建
  • delphi disqlite3 操作sqlite
  • C# 计算梯形面积和周长的程序(Program to calculate area and perimeter of Trapezium)
  • 在Windows Server 2012 R2中安装与配置IIS服务并部署mssql靶机教程
  • 【世纪龙科技】新能源汽车概论-汽车教学数字课程资源
  • 如何编写假设和约束---SRS软件需求规格指南系列
  • 概率论与数理统计(八)
  • 【跨国数仓迁移最佳实践2】MaxCompute SQL执行引擎对复杂类型处理全面重构,保障客户从BigQuery平滑迁移
  • java和ptyhon对比
  • C# Lambdab表达式 Var 类
  • PyQt5—QInputDialog 学习笔记
  • Iridium Certus 9704 卫星物联网开发套件
  • uniapp app pdf.js报错:Uncaught SyntaxError:Unexpected token ‘{‘
  • UART串口
  • 学习日志7.21