19.TaskExecutor与ResourceManager建立连接
19.TaskExecutor与ResourceManager建立连接
- 在启动过程中,
TaskExecutor
首先会从ResourceManagerLeaderRetriever
获取到当前ResourceManagerGateway
(这是与ResourceManager
通信的代理对象)
TaskExecutor向resourceManager 整体流程
TaskExecutor
与ResourceManager
的注册不是一次性的,而是基于 RetryingRegistration 组件进行的,它负责:- 发起注册请求(
invokeRegistration
方法) - 接收注册结果
- 根据结果决定:完成注册 or 失败重试
- 发起注册请求(
/*** This method performs a registration attempt and triggers either a success notification or a* retry, depending on the result.*/@SuppressWarnings("unchecked")private void register(final G gateway, final int attempt, final long timeoutMillis) {// eager check for canceling to avoid some unnecessary workif (canceled) {return;}try {log.debug("Registration at {} attempt {} (timeout={}ms)",targetName,attempt,timeoutMillis);//该方法是通过 获取的 resourceManagerGateway调用CompletableFuture<RegistrationResponse> registrationFuture =invokeRegistration(gateway, fencingToken, timeoutMillis);// if the registration was successful, let the TaskExecutor know//这里CompletableFuture<Void> registrationAcceptFuture =registrationFuture.thenAcceptAsync((RegistrationResponse result) -> {if (!isCanceled()) {if (result instanceof RegistrationResponse.Success) {log.debug("Registration with {} at {} was successful.",targetName,targetAddress);S success = (S) result;//这个 completionFuture 就是Flink与resourceManager交互的 newRegistration.getFuture(); //这个异步直接完成,把resourceManagerGateWaye 封装返回completionFuture.complete(RetryingRegistrationResult.success(gateway, success));} else if (result instanceof RegistrationResponse.Rejection) {log.debug("Registration with {} at {} was rejected.",targetName,targetAddress);R rejection = (R) result;completionFuture.complete(RetryingRegistrationResult.rejection(rejection));} else {// registration failureif (result instanceof RegistrationResponse.Failure) {RegistrationResponse.Failure failure =(RegistrationResponse.Failure) result;log.info("Registration failure at {} occurred.",targetName,failure.getReason());} else {log.error("Received unknown response to registration attempt: {}",result);}log.info("Pausing and re-attempting registration in {} ms",retryingRegistrationConfiguration.getRefusedDelayMillis());registerLater(gateway,1,retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(),retryingRegistrationConfiguration.getRefusedDelayMillis());}}},rpcService.getScheduledExecutor());// upon failure, retryregistrationAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !isCanceled()) {if (ExceptionUtils.stripCompletionException(failure)instanceof TimeoutException) {// we simply have not received a response in time. maybe the timeout// was// very low (initial fast registration attempts), maybe the target// endpoint is// currently down.if (log.isDebugEnabled()) {log.debug("Registration at {} ({}) attempt {} timed out after {} ms",targetName,targetAddress,attempt,timeoutMillis);}long newTimeoutMillis =Math.min(2 * timeoutMillis,retryingRegistrationConfiguration.getMaxRegistrationTimeoutMillis());register(gateway, attempt + 1, newTimeoutMillis);} else {// a serious failure occurred. we still should not give up, but keep// tryinglog.error("Registration at {} failed due to an error",targetName,failure);log.info("Pausing and re-attempting registration in {} ms",retryingRegistrationConfiguration.getErrorDelayMillis());registerLater(gateway,1,retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(),retryingRegistrationConfiguration.getErrorDelayMillis());}}},rpcService.getScheduledExecutor());} catch (Throwable t) {completionFuture.completeExceptionally(t);cancel();}}
核心注册流程
一、TaskExecutor 向 ResourceManager 注册
CompletableFuture<RegistrationResponse> registrationFuture =invokeRegistration(gateway, fencingToken, timeoutMillis);
invokeRegistration
方法实质上调用了:
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManager,ResourceManagerId fencingToken,long timeoutMillis)throws Exception {Time timeout = Time.milliseconds(timeoutMillis);return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
}
-
即:通过 ResourceManager 的网关对象,调用其远程方法
registerTaskExecutor()
,将 TaskExecutor 的注册信息发送过去。 -
taskExecutorRegistration
对象封装了 TaskExecutor 自身的资源信息(地址、端口、资源配置等)。public TaskExecutorRegistration(final String taskExecutorAddress,final ResourceID resourceId,final int dataPort,final int jmxPort,final HardwareDescription hardwareDescription,final TaskExecutorMemoryConfiguration memoryConfiguration,final ResourceProfile defaultSlotResourceProfile,final ResourceProfile totalResourceProfile,final String nodeId) {//地址this.taskExecutorAddress = checkNotNull(taskExecutorAddress);//资源编号this.resourceId = checkNotNull(resourceId);//端口号this.dataPort = dataPort;this.jmxPort = jmxPort;this.hardwareDescription = checkNotNull(hardwareDescription);this.memoryConfiguration = checkNotNull(memoryConfiguration);this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile);this.totalResourceProfile = checkNotNull(totalResourceProfile);this.nodeId = checkNotNull(nodeId);}
二、ResourceManager 处理注册请求
- 建立 TaskExecutor 网关连接
getRpcService().connect(taskExecutorRegistration.getTaskExecutorAddress(), TaskExecutorGateway.class)
- ResourceManager 通过 TaskExecutor 的地址建立一个到 TaskExecutor 的远程网关连接(TaskExecutorGateway)。
- 防重处理
- 防止重复注册或失效注册:
if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(resourceId))
- 保证是最新的连接请求。
- 执行注册内部逻辑
registerTaskExecutorInternal(taskExecutorGateway, taskExecutorRegistration)
@Overridepublic CompletableFuture<RegistrationResponse> registerTaskExecutor(final TaskExecutorRegistration taskExecutorRegistration, final Time timeout) {CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture =//获取对应的 taskExecutor 地址并通信生成taskExecutor 网关getRpcService().connect(taskExecutorRegistration.getTaskExecutorAddress(),TaskExecutorGateway.class);//将这个future 放入 taskExecutorGatewayFutures 。防止重复注册taskExecutorGatewayFutures.put(taskExecutorRegistration.getResourceId(), taskExecutorGatewayFuture);return taskExecutorGatewayFuture.handleAsync((TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {final ResourceID resourceId = taskExecutorRegistration.getResourceId();//判断一下,防止连错if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(resourceId)) {//连接上了taskExecutorGatewayFutures.remove(resourceId);if (throwable != null) {return new RegistrationResponse.Failure(throwable);} else {//这里resourceManager进行注册return registerTaskExecutorInternal(taskExecutorGateway, taskExecutorRegistration);}} else {log.debug("Ignoring outdated TaskExecutorGateway connection for {}.",resourceId.getStringWithMetadata());return new RegistrationResponse.Failure(new FlinkException("Decline outdated task executor registration."));}},getMainThreadExecutor());}
三、registerTaskExecutorInternal 具体逻辑
/*** Registers a new TaskExecutor.** @param taskExecutorRegistration task executor registration parameters* @return RegistrationResponse*/private RegistrationResponse registerTaskExecutorInternal(TaskExecutorGateway taskExecutorGateway,TaskExecutorRegistration taskExecutorRegistration) {ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();WorkerRegistration<WorkerType> oldRegistration =taskExecutors.remove(taskExecutorResourceId);if (oldRegistration != null) {// TODO :: suggest old taskExecutor to stop itselflog.debug("Replacing old registration of TaskExecutor {}.",taskExecutorResourceId.getStringWithMetadata());// remove old task manager registration from slot managerslotManager.unregisterTaskManager(oldRegistration.getInstanceID(),new ResourceManagerException(String.format("TaskExecutor %s re-connected to the ResourceManager.",taskExecutorResourceId.getStringWithMetadata())));}final Optional<WorkerType> newWorkerOptional =getWorkerNodeIfAcceptRegistration(taskExecutorResourceId);String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();if (!newWorkerOptional.isPresent()) {log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did "+ "not recognize it",taskExecutorResourceId.getStringWithMetadata(),taskExecutorAddress);return new TaskExecutorRegistrationRejection("The ResourceManager does not recognize this TaskExecutor.");} else {WorkerType newWorker = newWorkerOptional.get();WorkerRegistration<WorkerType> registration =new WorkerRegistration<>(taskExecutorGateway,newWorker,taskExecutorRegistration.getDataPort(),taskExecutorRegistration.getJmxPort(),taskExecutorRegistration.getHardwareDescription(),taskExecutorRegistration.getMemoryConfiguration(),taskExecutorRegistration.getTotalResourceProfile(),taskExecutorRegistration.getDefaultSlotResourceProfile(),taskExecutorRegistration.getNodeId());log.info("Registering TaskManager with ResourceID {} ({}) at ResourceManager",taskExecutorResourceId.getStringWithMetadata(),taskExecutorAddress);//登记注册的 taskExecutorstaskExecutors.put(taskExecutorResourceId, registration);//登记心跳服务的taskExecutorstaskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new TaskExecutorHeartbeatSender(taskExecutorGateway));//返回给taskExecutor 注册成功return new TaskExecutorRegistrationSuccess(registration.getInstanceID(),resourceId,clusterInformation,latestTokens.get());}}
-
重点关注:注册到心跳监控 = 注册到一个 ConcurrentHashMap
-
当
TaskExecutor
完成在ResourceManager
的注册后,会进入心跳监控阶段,核心步骤:HeartbeatManagerImpl.monitorTarget()
方法被调用。- 本质操作:
heartbeatTargets.put(resourceID, heartbeatMonitor);
这是一个:以 TaskExecutor的ResourceID 为 Key,以对应的心跳监控对象(HeartbeatMonitor) 为 Value,存储在 ConcurrentHashMap 中。
private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
源码:
/** Map containing the heartbeat monitors associated with the respective resource ID. */private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;@Overridepublic void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {if (!stopped) {if (heartbeatTargets.containsKey(resourceID)) {log.debug("The target with resource ID {} is already been monitored.",resourceID.getStringWithMetadata());} else {HeartbeatMonitor<O> heartbeatMonitor =heartbeatMonitorFactory.createHeartbeatMonitor(resourceID,heartbeatTarget,mainThreadExecutor,heartbeatListener,heartbeatTimeoutIntervalMs,failedRpcRequestsUntilUnreachable);heartbeatTargets.put(resourceID, heartbeatMonitor);// check if we have stopped in the meantime (concurrent stop operation)if (stopped) {heartbeatMonitor.cancel();heartbeatTargets.remove(resourceID);}}}}
四、TaskExecutor 确认注册成功的关键步骤
当 ResourceManager 返回注册成功的响应后,TaskExecutor 会通过以下代码明确自身已成功注册:
completionFuture.complete(RetryingRegistrationResult.success(gateway, success)
);
这是整个异步注册流程中的重要标志,具体含义如下:
- completionFuture 是 TaskExecutor 与 ResourceManager 注册流程的控制核心,表示注册操作的最终结果。
- 当收到 RegistrationResponse.Success,即 ResourceManager 确认接收 TaskExecutor 后,调用该方法,将注册状态标记为成功。
- 注册成功后,TaskExecutor 会继续执行:
- 汇报自身的 Slot 资源;
- 启动与 ResourceManager 的心跳服务,进入正式的资源管理与监控流程。
五、TaskExecutorToResourceManagerConnection的onRegistrationSuccess方法
-
在前面讲过,TaskExecutor 与 ResourceManager 完成注册的“最终确认”依赖于
completionFuture.complete()
。而注册完成后,整个注册链的回调流程由TaskExecutorToResourceManagerConnection
控制,具体在createNewRegistration()
方法中体现:private RetryingRegistration<F, G, S, R> createNewRegistration() {RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =newRegistration.getFuture();future.whenCompleteAsync((RetryingRegistration.RetryingRegistrationResult<G, S, R> result,Throwable failure) -> {if (failure != null) {if (failure instanceof CancellationException) {// we ignore cancellation exceptions because they originate from// cancelling// the RetryingRegistrationlog.debug("Retrying registration towards {} was cancelled.",targetAddress);} else {// this future should only ever fail if there is a bug, not if the// registration is declinedonRegistrationFailure(failure);}} else {if (result.isSuccess()) {//这一步targetGateway = result.getGateway();onRegistrationSuccess(result.getSuccess());} else if (result.isRejection()) {onRegistrationRejection(result.getRejection());} else {throw new IllegalArgumentException(String.format("Unknown retrying registration response: %s.", result));}}},executor);return newRegistration;}
-
**
ResourceManagerRegistrationListener
*的本质就是*监听ResourceManager连接状态的监听器,并在注册成功后,通过它调用了核心方法:private void establishResourceManagerConnection(ResourceManagerGateway resourceManagerGateway,ResourceID resourceManagerResourceId,InstanceID taskExecutorRegistrationId,ClusterInformation clusterInformation) {//汇报 Slot 信息final CompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(getResourceID(),taskExecutorRegistrationId,taskSlotTable.createSlotReport(getResourceID()),Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));slotReportResponseFuture.whenCompleteAsync((acknowledge, throwable) -> {if (throwable != null) {reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.",throwable));}},getMainThreadExecutor());// monitor the resource manager as heartbeat target//纳入心跳监控resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId,new ResourceManagerHeartbeatReceiver(resourceManagerGateway));// set the propagated blob server address设置 BlobServer 地址final InetSocketAddress blobServerAddress =new InetSocketAddress(clusterInformation.getBlobServerHostname(),clusterInformation.getBlobServerPort());taskExecutorBlobService.setBlobServerAddress(blobServerAddress);//记录正式连接establishedResourceManagerConnection =new EstablishedResourceManagerConnection(resourceManagerGateway,resourceManagerResourceId,taskExecutorRegistrationId);//停止注册超时任务stopRegistrationTimeout();}
-
六、停止注册超时任务
- 总结:startRegistrationTimeout 是为了给注册过程加一个超时保护,防止无限等待;stopRegistrationTimeout 则在注册成功后,将超时任务失效,安全终结注册流程。
private void stopRegistrationTimeout() {currentRegistrationTimeoutId = null;
}
private void startRegistrationTimeout() {final Duration maxRegistrationDuration =taskManagerConfiguration.getMaxRegistrationDuration();if (maxRegistrationDuration != null) {final UUID newRegistrationTimeoutId = UUID.randomUUID();currentRegistrationTimeoutId = newRegistrationTimeoutId;scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);}}
总结
TaskExecutor 与 ResourceManager 的注册流程全貌
TaskExecutor 启动时,会与 ResourceManager 建立连接并完成注册,这一过程具备 异步 + 重试 + 心跳 等机制,确保系统稳定性与可靠性。整个流程可概括为以下几步:
- 通过 RetryingRegistration 发起注册(支持重试)
- 使用
RetryingRegistration
构建注册流程,具备自动重试、超时处理等功能; - 通过
invokeRegistration()
方法向 ResourceManager 发送注册请求; - 成功后通过
completionFuture.complete(...)
通知上层注册完成。
- ResourceManager 接收注册请求并处理
- 收到注册请求后,通过
RpcService.connect(...)
与 TaskExecutor 建立 RPC 通信; - 若注册合法,调用
registerTaskExecutorInternal()
完成内部登记:- 注册 TaskExecutor 信息;
- 加入 SlotManager;
- 启动心跳监控;
- 返回
TaskExecutorRegistrationSuccess
响应。
- TaskExecutor 接收成功响应,进入工作状态
- 收到
RegistrationResponse.Success
后,调用completionFuture.complete(...)
标记注册成功; - 继续执行以下步骤:
- 注册 Slot;
- 启动向 ResourceManager 的心跳机制;
- 完成与调度系统的对接,开始参与作业调度。
- 取消之前设置的注册超时保护机制,防止误触发超时处理;