当前位置: 首页 > news >正文

深度了解flink rpc机制(四) 组件启动流程源码分析

前言

目前已发布了3篇关于Flink RPC相关的文章,分别从底层通信系统akka/Pekko,RPC实现方式动态代理以及Flink RPC相关的组件做了介绍

深度了解flink rpc机制(一)-Akka/Pekko_flink pekko akka-CSDN博客

深度了解flink rpc机制(二)-动态代理-CSDN博客

深度了解flink rpc机制(三)-组件以及交互-CSDN博客

这篇文章通过分析源码,对以上知识进行验证并串联加深印象,更深入的了解Flink RPC的实现原理。本篇文章分享TaskManager启动和向ResouceManager注册的流程,TaskManager在flink 1.12之后被更名为TaskExecutor,可能文章中两个名称都会使用,大家理解成一个就行。

TaskManage启动源码分析

入口类

TaskManager的启动类入口,以Flink的Standalone模式为例,可以在flink目录下的bin目录的flink-daemon.sh找到入口类:

. "$bin"/config.shcase $DAEMON in(taskexecutor)CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(zookeeper)CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(historyserver)CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer;;(standalonesession)CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(*)echo "Unknown daemon '${DAEMON}'. $USAGE."exit 1;;
esac

从这里可以看到Standalon模式下各个组件的启动类入口,TaskManager的入口类是TaskManageRunner,做为组件的入口类,肯定会有main方法:

    public static void main(String[] args) throws Exception {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();if (maxOpenFileHandles != -1L) {LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);} else {LOG.info("Cannot determine the maximum number of open file descriptors");}//安装的方式启动taskmanager进程    runTaskManagerProcessSecurely(args);}

之后就是在TaskManageRunner的方法调用了,最终会进入到runTaskManager这个静态方法

public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {//之前方法都是静态方法调用,初始化taskManagerRunner对象taskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService);//开始创建TaskmanagertaskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}}

之前一直是在调用TaskManageRunner的静态方法做一些日志加载,安全检查的前置校验,此时才真正的实例化TaskManageRunner对象,调用start方法进行TaskManager的创建

//taskManagerRunner.start()public void start() throws Exception {synchronized (lock) {startTaskManagerRunnerServices();taskExecutorService.start();}
}

创建RpcService和TaskExecutor

taskManagerRunner.start()方法内部有两个方法的调用

  • startTaskManagerRunnerServices()
private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {rpcSystem = RpcSystem.load(configuration);//非RPC相关 代码省略JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));//创建rpcServicerpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);//非RPC相关 代码省略//创建TaskExecutortaskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}}

可以看到这个方法首先调用createRpcService这个方法,这个方法内部内就是去创建ActorSystem,初始化RpcService

初始化RpcServer和PekkoInvocationHandler

然后就是创建TaskExecutor,TaskExecutor继承自EndPoint,EndPoint构造方法执行的时候会初始化RpcServer

/*** Initializes the RPC endpoint.** @param rpcService The RPC server that dispatches calls to this RPC endpoint.* @param endpointId Unique identifier for this endpoint*/protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");//创建RpcServer 方法内部//1.创建Acotr通信对象PekkoRpcActor//2.对象动态代理对象PekkoInvocationHandler赋值给rpcServerthis.rpcServer = rpcService.startServer(this);this.resourceRegistry = new CloseableRegistry();this.mainThreadExecutor =new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);registerResource(this.mainThreadExecutor);}
  • taskExecutorService.start()

这个方法会调用TaskExecutor对象的start方法,会调用父类EndPoint的start方法

    /*** Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc* endpoint is ready to process remote procedure calls.*/public final void start() {rpcServer.start();}

rpcServer.start()方法如下

public void start() {//rpcEndpoint是Actor对象rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());}

这块儿代码就是taskmanger初始化后自己会给自己发送一个Akka START控制类的消息,准确来说是继承了EndPoint的类都会在初始化之后给自身发送一个这样的消息。

因为发的是Akka的消息,会进入到TaskExecutor的PekkoInvocationHandler#createReceive接收Akka消息的逻辑

    //构造方法PekkoRpcActor(final T rpcEndpoint,final CompletableFuture<Boolean> terminationFuture,final int version,final long maximumFramesize,final boolean forceSerialization,final ClassLoader flinkClassLoader) {//省略其他代码//PekkoPrcActor初始化 会将state枚举值设置为StoppedState.STOPPEDthis.state = StoppedState.STOPPED;}//接收消息@Overridepublic Receive createReceive() {return ReceiveBuilder.create()//匹配到握手消息.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)//控制类消息.match(ControlMessages.class, this::handleControlMessage)//除以上两种之外的任意消息.matchAny(this::handleMessage).build();//处理控制类消息的逻辑    private void handleControlMessage(ControlMessages controlMessage) {try {switch (controlMessage) {case START:state = state.start(this, flinkClassLoader);break;case STOP:state = state.stop();break;case TERMINATE:state = state.terminate(this, flinkClassLoader);break;default:handleUnknownControlMessage(controlMessage);}} catch (Exception e) {this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);throw e;}}

PekkoRpcActor在初始化的时候会 将自身state属性设置为StoppedState.STOPPED;

接收到ControlMessages.START消息,会走到handleControlMessage方法的case stop分支,因为state是StoppedState.STOPPED,所以代码会走到StoppedState这个静态枚举类的start方法

public State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {pekkoRpcActor.mainThreadValidator.enterMainThread();try {runWithContextClassLoader(() -> pekkoRpcActor.rpcEndpoint.internalCallOnStart(), flinkClassLoader);} catch (Throwable throwable) {pekkoRpcActor.stop(RpcEndpointTerminationResult.failure(new RpcException(String.format("Could not start RpcEndpoint %s.",pekkoRpcActor.rpcEndpoint.getEndpointId()),throwable)));} finally {pekkoRpcActor.mainThreadValidator.exitMainThread();}return StartedState.STARTED;}

pekkoRpcActor.rpcEndpoint.internalCallOnStart()这块儿代码是关键,又指定到了Endpoint定义的方法,

public final void internalCallOnStart() throws Exception {validateRunsInMainThread();isRunning = true;onStart();}protected void onStart() throws Exception {}

这块儿代码饶了半天,其实用大白话来讲就是Flink任何需要进行通信的组件都要继承Endpoint类,组件初始化之前会先初始化RpcService对象作为Endpoint子类的成员变量,然后再由RpcService初始化ActorSystem,创建Actor和代理对象,之后再给自身发一个控制类的START方法,最后一定要进入到自身的onStart方法

TaskExecutor向ResourceManager注册流程

onStart方法开始进入到向ResourceManager注册的流程

    @Overridepublic void onStart() throws Exception {try {//开始向ResourceManager注册startTaskExecutorServices();} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout();}private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManager//new ResourceManagerLeaderListener()是真正注册的代码resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());//省略其他代码} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}

new ResourceManagerLeaderListener()是真正注册的方法

 private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {@Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {runAsync(() ->notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}@Overridepublic void handleError(Exception exception) {onFatalError(exception);}}

再进入到notifyOfNewResourceManagerLeader方法内部

private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {//获取ResouceManager的通信地址resourceManagerAddress =createResourceManagerAddress(newLeaderAddress, newResourceManagerId);//尝试连接ResouceMnangerreconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s",resourceManagerAddress)));
}

reconnectToResourceManager方法内部

private void reconnectToResourceManager(Exception cause) {//如果已存在ResourceManger的连接  关闭连接closeResourceManagerConnection(cause);//设置注册超时时间startRegistrationTimeout();//继续尝试连接ResouceManagertryConnectToResourceManager();}

tryConnectToResourceManager();

    private void tryConnectToResourceManager() {if (resourceManagerAddress != null) {connectToResourceManager();}}private void connectToResourceManager() {assert (resourceManagerAddress != null);assert (establishedResourceManagerConnection == null);assert (resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);//封装taskExecutor的信息:地址 硬件资源 内存资源final TaskExecutorRegistration taskExecutorRegistration =new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();}

进入到connectToResourceManager方法,封装注册信息。进入start方法

public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null,"The RPC connection is already started");//创建注册成功、注册失败的回调方法final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {//开始主持newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}

首先创建注册成功和主持失败的回调方法,然后继续进入注册的流程

public void startRegistration() {//创建动态代理对象final CompletableFuture<G> rpcGatewayFuture;//ResourceManager可能有主从,所以走Fenced这块儿if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture =(CompletableFuture<G>)rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}//省略其他代码 
}private <C extends RpcGateway> CompletableFuture<C> connectInternal(final String address,final Class<C> clazz,Function<ActorRef, InvocationHandler> invocationHandlerFactory) {checkState(!stopped, "RpcService is stopped");//省略无关代码//握手确保连接正常final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler =invocationHandlerFactory.apply(actorRef);ClassLoader classLoader = getClass().getClassLoader();//真正核心的代码 创建代理的实现@SuppressWarnings("unchecked")C proxy =(C)Proxy.newProxyInstance(classLoader,new Class<?>[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);}

然后就会走到RpcService获取到ReouceManager的代理,然后将代理对象和主持方法通过akka消息发送到ResouceManager的RpcActor,然后进入消息处理,执行代理的对象的注册方法,也就是ResouceManager的注册方法,从而将TaskManager进行注册

启动注册流程图

http://www.lryc.cn/news/471599.html

相关文章:

  • C++基于opencv的视频质量检测--遮挡检测
  • 手机玩潜水员戴夫?GameViewer远程如何随时随地玩潜水员戴夫教程
  • UE5 喷射背包
  • 【Vue3】第三篇
  • c++二级指针
  • 客户端存储 — IndexedDB 实现分页查询
  • logback 如何将日志输出到文件
  • Files.newBufferedReader和Files.readAllLines
  • MySQL 数据库备份与恢复全攻略
  • Appium中的api(一)
  • 【AI辅助设计】没错!训练FLUX LoRA就这么简单!
  • Mac 下安装FastDFS
  • 人工智能的未来:重塑生活与工作的变革者
  • 【微服务】Java 对接飞书多维表格使用详解
  • 学习threejs,使用粒子实现下雪特效
  • unity3d——Time
  • 天地图实现海量聚合marker--uniapp后端详细实现
  • Bug | 项目中数据库查询问题
  • C++入门基础知识129—【关于C 库函数 - time()】
  • 大文件秒传,分片上传,断点续传
  • 多生境扩增子探秘:深度溯源与多样性解析
  • Selenium4自动化测试常用函数总结,各种场景操作实战
  • 图像生成新范式:智源推出全能视觉生成模型 OmniGen
  • 实现RPC接口的demo记录
  • Python期末题目 | 期末练习题【概念题+代码】
  • OpenCV基本操作(python开发)——(6)视频基本处理
  • 详解Java之Spring MVC篇一
  • ubuntu20.04上使用 Verdaccio 搭建 npm 私有仓库
  • Python实现办公自动化的数据可视化与报表生成
  • 前端知识串联笔记(更新中...)