Flink-1.19.0源码详解6-JobGraph生成-后篇
在上文《Flink-1.19.0源码详解5-JobGraph生成-前篇》已解析了JobGraph生成源码中递归创建算子链与JobVertex链头节点生成的内容,下面将继续解析JobGraph的JobEdge边创建、IntermediateDataSet数据集创建、算子链序列化、客户端请求提交的完整源码。
1.JobGraph生成核心方法
StreamingJobGraphGenerator.createJobGraph()方法为JobGraph生成的核心方法,包含了所有创建JobGraph的关键步骤。首先创建了算子链把可链的节点链起来并创建JobVertex链头节点,并对不可链的输出创建JobEdge连接前后JobVertex,将算子信息序列化到JobVertex中,并最终配置JobGraph的SlotSharingGroup、Checkpoint、SavepointRestore、ExecutionConfig完成JobGraph的创建。
源码图解:
StreamingJobGraphGenerator.createJobGraph()方法源码:
private JobGraph createJobGraph() {//验证Checkpoint以及配置JobType、DynamicpreValidate();jobGraph.setJobType(streamGraph.getJobType());jobGraph.setDynamic(streamGraph.isDynamic());jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());// Generate deterministic hashes for the nodes in order to identify them across// submission iff they didn't change.//创建用于标识各节点的哈希值Map<Integer, byte[]> hashes =defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// Generate legacy version hashes for backwards compatibilityList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}//把可链的节点链接起来setChaining(hashes, legacyHashes);if (jobGraph.isDynamic()) {setVertexParallelismsForDynamicGraphIfNecessary();}// Note that we set all the non-chainable outputs configuration here because the// "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job// vertices and partition-reusefinal Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =new HashMap<>();//对每个不能chain的输出进行序列化 setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);//遍历不可链的JobVertex生成JobEdgesetAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);//设置PhysicalEdges,将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中setPhysicalEdges();markSupportingConcurrentExecutionAttempts();validateHybridShuffleExecuteInBatchMode();//为每个 JobVertex 设置所属的 SlotSharingGroupsetSlotSharingAndCoLocation();setManagedMemoryFraction(Collections.unmodifiableMap(jobVertices),Collections.unmodifiableMap(vertexConfigs),Collections.unmodifiableMap(chainedConfigs),id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());//配置checkpointconfigureCheckpointing();//配置savepointRestorejobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =JobGraphUtils.prepareUserArtifactEntries(streamGraph.getUserArtifacts().stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1)),jobGraph.getJobID());for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :distributedCacheEntries.entrySet()) {jobGraph.addUserArtifact(entry.getKey(), entry.getValue());}//配置ExecutionConfig// set the ExecutionConfig last when it has been finalizedtry {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());} catch (IOException e) {throw new IllegalConfigurationException("Could not serialize the ExecutionConfig."+ "This indicates that non-serializable types (like custom serializers) were registered");}jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());addVertexIndexPrefixInVertexName();setVertexDescription();//对每个链头节点,序列化其算子链,放到节点配置中// Wait for the serialization of operator coordinators and stream config.try {FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();} catch (Exception e) {throw new FlinkRuntimeException("Error in serialization.", e);}if (!streamGraph.getJobStatusHooks().isEmpty()) {jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());}//返回创建好的JobGraphreturn jobGraph;
}
2.创建JobEdge边与IntermediateDataSet数据集
JobEdge与IntermediateDataSet的创建主要是通过遍历每个JobVertex链头节点所有不可链的StreamEdge,把每个不可链的边创建成IntermediateDataSet和JobEdge。
其中从setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs)进入StreamingJobGraphGenerator.setAllVertexNonChainedOutputsConfigs()方法,开始进入JobEdge边与IntermediateDataSet数据集创建的源码。
源码图解:
StreamingJobGraphGenerator.setAllVertexNonChainedOutputsConfigs()方法首先对所有JobVertex进行遍历。
StreamingJobGraphGenerator.setAllVertexNonChainedOutputsConfigs()方法源码:
private void setAllVertexNonChainedOutputsConfigs(final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) {//对每个JobVertex遍历,生成JobEdge jobVertices.keySet().forEach(startNodeId ->setVertexNonChainedOutputsConfig(startNodeId,vertexConfigs.get(startNodeId),chainInfos.get(startNodeId).getTransitiveOutEdges(),opIntermediateOutputs));
}
对每个JobVertex节点,遍历其不可链的StreamEdge边,调用connect()进行边生成。
StreamingJobGraphGenerator.setVertexNonChainedOutputsConfig()方法源码:
private void setVertexNonChainedOutputsConfig(Integer startNodeId,StreamConfig config,List<StreamEdge> transitiveOutEdges,final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) {//遍历该JobVertex所有不可链的StreamEdge,调用connect()进行边生成LinkedHashSet<NonChainedOutput> transitiveOutputs = new LinkedHashSet<>();for (StreamEdge edge : transitiveOutEdges) {NonChainedOutput output = opIntermediateOutputs.get(edge.getSourceId()).get(edge);transitiveOutputs.add(output);//调用connect()connect(startNodeId, edge, output);}
}
StreamingJobGraphGenerator.connect()方法获取了当前JobVertex节点与下游JobVertex节点,获取节点的获取patitioner类型与shuffle模式,根据partitioner连接类型时POINTWISE还是ALL_TO_ALL具体生成JobEdge。
StreamingJobGraphGenerator.connect()方法源码:
private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {//在physicalEdgesInOrder中添加本edgephysicalEdgesInOrder.add(edge);Integer downStreamVertexID = edge.getTargetId();//找到本JobVertex的链头JobVertex headVertex = jobVertices.get(headOfChain);//获取本边连接的下游JobVertexJobVertex downStreamVertex = jobVertices.get(downStreamVertexID);//配置下游JobVertex的input加一StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);//获取patitioner类型与shuffle模式StreamPartitioner<?> partitioner = output.getPartitioner();ResultPartitionType resultPartitionType = output.getPartitionType();checkBufferTimeout(resultPartitionType, edge);//根据partitioner连接类型具体生成JobEdgeJobEdge jobEdge;if (partitioner.isPointwise()) {jobEdge =downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.POINTWISE,resultPartitionType,output.getDataSetId(),partitioner.isBroadcast());} else {jobEdge =downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.ALL_TO_ALL,resultPartitionType,output.getDataSetId(),partitioner.isBroadcast());}//配置JobEdge// set strategy name so that web interface can show it.jobEdge.setShipStrategyName(partitioner.toString());jobEdge.setForward(partitioner instanceof ForwardPartitioner);jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());if (LOG.isDebugEnabled()) {LOG.debug("CONNECTED: {} - {} -> {}",partitioner.getClass().getSimpleName(),headOfChain,downStreamVertexID);}
}
JobVertex.connectNewDataSetAsInput()为具体创建JobEdge的方法,在方法中分别创建了IntermediateDataSet与JobEdge实例,并用JobEdge连接JobVertex与IntermediateDataSet。
JobVertex.connectNewDataSetAsInput()方法源码:
public JobEdge connectNewDataSetAsInput(JobVertex input,DistributionPattern distPattern,ResultPartitionType partitionType,IntermediateDataSetID intermediateDataSetId,boolean isBroadcast) {//生成IntermediateDataSetIntermediateDataSet dataSet =input.getOrCreateResultDataSet(intermediateDataSetId, partitionType);//生成JobEdgeJobEdge edge = new JobEdge(dataSet, this, distPattern, isBroadcast);//把JobEdge连接上JobVertex与IntermediateDataSetthis.inputs.add(edge);dataSet.addConsumer(edge);return edge;
}
其中IntermediateDataSet的创建在JobVertex.connectNewDataSetAsInput()方法中。
JobVertex.connectNewDataSetAsInput()方法源码:
public IntermediateDataSet getOrCreateResultDataSet(IntermediateDataSetID id, ResultPartitionType partitionType) {return this.results.computeIfAbsent(//创建IntermediateDataSetid, key -> new IntermediateDataSet(id, partitionType, this));
}
3.序列化算子计算逻辑、算子链、链输出、链内部边
JobGraph为Flink计算逻辑图,需要保存算子的计算信息,并从Flink客户端发送到Yarn上的Flink集群端,进行分布式执行。因此,在发送JobGraph前,需在每个JobVertex节点序列化JobVertex的计算逻辑、JobVertex的算子链、链输出和链内部边。
在StreamingJobGraphGenerator.createChain()方法下
首先在StreamingJobGraphGenerator.createChain()递归每个节点创建算子链时,会将节点的封装计算逻辑的StreamOperatorFactory和其可链边ChainedOutputs放入保存其需要序列化信息的Map:toBeSerializedConfigObjects中,并在后面对其进行序列化。
源码图解:
StreamingJobGraphGenerator.createChain()方法源码
private List<StreamEdge> createChain(final Integer currentNodeId,final int chainIndex,final OperatorChainInfo chainInfo,final Map<Integer, OperatorChainInfo> chainEntryPoints) {//递归创建算子链...//...//方法内包含配置StreamOperatorFactory的序列化的代码setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources());//配置不可链边的序列化setOperatorChainedOutputsConfig(config, chainableOutputs);//...
}
算子计算逻辑序列化
在StreamingJobGraphGenerator.setOperatorConfig()方法中执行setStreamOperatorFactory(vertex.getOperatorFactory())配置StreamOperatorFactory的序列化。
StreamingJobGraphGenerator.setOperatorConfig()方法源码:
private void setOperatorConfig(Integer vertexId, StreamConfig config, Map<Integer, ChainedSourceInfo> chainedSources) {//...//配置StreamOperatorFactory序列化config.setStreamOperatorFactory(vertex.getOperatorFactory());//...
}
StreamConfig.setStreamOperatorFactory()方法配置了StreamOperatorFactory的序列化,把StreamOperatorFactory放入保存序列化信息的Map:toBeSerializedConfigObjects中,并在后续进行具体的序列化操作。
StreamConfig.setStreamOperatorFactory()方法源码:
public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {if (factory != null) {//配置OperatorFactory的序列化toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, factory.getClass());}
}
算子可链边的序列化
在StreamingJobGraphGenerator.setOperatorConfig()方法中执行setOperatorChainedOutputsConfig(config, chainableOutputs)配置节点可链边ChainedOutputs的序列化。
StreamingJobGraphGenerator.setOperatorChainedOutputsConfig方法源码:
private void setOperatorChainedOutputsConfig(StreamConfig config, List<StreamEdge> chainableOutputs) {//为每个边创建序列化器for (StreamEdge edge : chainableOutputs) {if (edge.getOutputTag() != null) {config.setTypeSerializerSideOut(edge.getOutputTag(),edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig().getSerializerConfig()));}}//配置可链边的序列化config.setChainedOutputs(chainableOutputs);
}
StreamConfig.setStreamOperatorFactory()方法配置了节点可链边ChainedOutputs的序列化,把可链边ChainedOutputs放入保存序列化信息的Map:toBeSerializedConfigObjects中,并在后续进行具体的序列化操作。
StreamConfig.setChainedOutputs()方法源码:
public void setChainedOutputs(List<StreamEdge> chainedOutputs) {//配置可链边的序列化toBeSerializedConfigObjects.put(CHAINED_OUTPUTS, chainedOutputs);
}
在StreamingJobGraphGenerator.createJobGraph()方法下
StreamingJobGraphGenerator的createJobGraph()方法为构造JobGraph主要方法,当在进行JobGraph创建时,会遍历JobGraph中所有节点的不可链的输出进行序列化,并把算子链的数据都序列化到链头的JobVertex节点的配置中,序列化每个JobVertex中toBeSerializedConfigObjects定义的配置项,完成JobGraph最终的序列化。
源码图解:
StreamingJobGraphGenerator.createJobGraph()方法源码:
private JobGraph createJobGraph() {//...//创建算子链与每个JobVertex链头节点setChaining(hashes, legacyHashes);//配置所有不可链边的序列化setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);//...//对每个节点序列化其配置与算子链,完成最终的序列化// Wait for the serialization of operator coordinators and stream config.try {FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();} catch (Exception e) {throw new FlinkRuntimeException("Error in serialization.", e);}//...//最终把创建好的JobGraph进行返回return jobGraph;
}
序列化不可链输出:
在StreamingJobGraphGenerator.createJobGraph()方法中执行了setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs)对JobGraph中所有不可链输出配置了序列化。StreamingJobGraphGenerator的setAllOperatorNonChainedOutputsConfigs()方法对所有不可链输出进行遍历,把不可链输出配置到其前置节点的需要序列化的配置中。
StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs()方法源码:
private void setAllOperatorNonChainedOutputsConfigs(final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) {// set non chainable output config//遍历每个不可链的输出opNonChainableOutputsCache.forEach((vertexId, nonChainableOutputs) -> {Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =opIntermediateOutputs.computeIfAbsent(vertexId, ignored -> new HashMap<>());//序列化到他前一个节点的配置中setOperatorNonChainedOutputsConfig(vertexId,vertexConfigs.get(vertexId),nonChainableOutputs,outputsConsumedByEdge);});
}
setOperatorNonChainedOutputsConfig()方法创建了序列化器,并把不可链输出配置在前置节点的序列化配置中。
StreamingJobGraphGenerator.setOperatorNonChainedOutputsConfig()方法源码:
private void setOperatorNonChainedOutputsConfig(Integer vertexId,StreamConfig config,List<StreamEdge> nonChainableOutputs,Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge) {//为每个输出创建序列化器// iterate edges, find sideOutput edges create and save serializers for each outputTag typefor (StreamEdge edge : nonChainableOutputs) {if (edge.getOutputTag() != null) {config.setTypeSerializerSideOut(edge.getOutputTag(),edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig().getSerializerConfig()));}}//序列化到他前一个节点的配置中List<NonChainedOutput> deduplicatedOutputs =mayReuseNonChainedOutputs(vertexId, nonChainableOutputs, outputsConsumedByEdge);config.setNumberOfOutputs(deduplicatedOutputs.size());config.setOperatorNonChainedOutputs(deduplicatedOutputs);
}
StreamConfig.setStreamOperatorFactory()方法配置了不可链边NonChainedOutput的序列化,把NonChainedOutput放入保存序列化信息的Map:toBeSerializedConfigObjects中,并在后续进行具体的序列化操作。
StreamConfig.setOperatorNonChainedOutputs方法源码:
public void setOperatorNonChainedOutputs(List<NonChainedOutput> nonChainedOutputs) {//进行序列化toBeSerializedConfigObjects.put(OP_NONCHAINED_OUTPUTS, nonChainedOutputs);
}
序列化算子链并最终执行序列化
在StreamingJobGraphGenerator.createJobGraph()方法中,遍历了JobGraph中的每个JobVertex节点,对每个链头节点的算子链和配置信息进行序列化。
//对每个链头节点,序列化其算子链,放到节点配置中
FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();
在StreamConfig的triggerSerializationAndReturnFuture()方法中,首先先调用serializeAllConfigs()方法对JobVertex节点的配置进行了序列化,再序列化了JobVertex节点的算子链。
StreamConfig.triggerSerializationAndReturnFuture()方法源码:
public CompletableFuture<StreamConfig> triggerSerializationAndReturnFuture(Executor ioExecutor) {FutureUtils.combineAll(chainedTaskFutures.values()).thenAcceptAsync(chainedConfigs -> {try {// Serialize all the objects to config.//序列化节点的StreamConfigserializeAllConfigs();//将算子链配置写入对应节点InstantiationUtil.writeObjectToConfig(chainedConfigs.stream().collect(Collectors.toMap(StreamConfig::getVertexID,Function.identity())),this.config,CHAINED_TASK_CONFIG);serializationFuture.complete(this);} catch (Throwable throwable) {serializationFuture.completeExceptionally(throwable);}},ioExecutor);return serializationFuture;
}
StreamConfig.serializeAllConfigs()方法将之前需要序列化的Map:toBeSerializedConfigObjects配置逐项遍历,进行序列化。包含对上述解析的封装节点计算逻辑的StreamOperatorFactory、节点可链边ChainedOutputs、节点不可链输出NonChainedOutput进行具体的序列化操作。
StreamConfig.serializeAllConfigs()方法源码:
public void serializeAllConfigs() {toBeSerializedConfigObjects.forEach((key, object) -> {try {//将需要序列化的配置进行序列化InstantiationUtil.writeObjectToConfig(object, this.config, key);} catch (IOException e) {throw new StreamTaskException(String.format("Could not serialize object for key %s.", key), e);}});
}
InstantiationUtil.writeObjectToConfig()方法为具体的序列化方法,将配置对象序列化成字节数组,并把序列化后的数据写入JobVertex的配置中。
InstantiationUtil.writeObjectToConfig()方法源码:
public static void writeObjectToConfig(Object o, Configuration config, String key)throws IOException {//序列化对象byte[] bytes = serializeObject(o);//将序列化的结果写入配置中config.setBytes(key, bytes);
}
自此JobGraph生成已完成,完整的JobGraph结构如下:
完整JobGraph结构:
4.客户端向集群提交任务
完成JobGraph生成后,Flink CliFrontend客户端通过ClusterClient向HDFS上传JobGraph、Jar、Artifacts,向Yarn的Flink集群提交Request,完成Flink客户端调度,后续将进行Flink的集群端的调度与执行。
源码图解:
回到AbstractSessionClusterExecutor.execute()方法,当完成了JobGraph生成后,Flink创建了RestClusterClient客户端(ClusterClient的具体实现),向集群提交JobGraph。
AbstractSessionClusterExecutor.execute()方法源码:
public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline,@Nonnull final Configuration configuration,@Nonnull final ClassLoader userCodeClassloader)throws Exception {//生成JobGraph final JobGraph jobGraph =PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);//...//建立ClusterClient,向集群提交JobGraphClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();//Client向集群提交JobGraphclusterClient.submitJob(jobGraph)//...
}
RestClusterClient.submitJob()方法为Flink任务提交的具体方法,创建了jobGraph文件并将其写到本地目录,将本地的jobGraph文件、jar、artifact上传到集群的HDFS文件系统,构建JobSubmit的请求体并向Flink集群发送请求。
RestClusterClient.submitJob()方法源码:
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {//...//创建jobGraph文件(flink-jobgraph-id.bin)java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph-" + jobGraph.getJobID(), ".bin");//... //把jobGraph文件写到本地文件目录ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {objectOut.writeObject(jobGraph);}//... //将本地的jobGraph文件、jar、artifact上传hdfsfilesToUpload.add( new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));filesToUpload.add(new FileUpload(Paths.get(jar.toUri()),RestConstants.CONTENT_TYPE_JAR));filesToUpload.add(new FileUpload(Paths.get(artifactFilePath.getPath()),RestConstants.CONTENT_TYPE_BINARY));//...//构建JobSubmit的请求体final JobSubmitRequestBody requestBody =new JobSubmitRequestBody(jobGraphFile.getFileName().toString(),jarFileNames,artifactFileNames);//...//向集群发送JobSubmit请求sendRetriableRequest(JobSubmitHeaders.getInstance(),EmptyMessageParameters.getInstance(),requestAndFileUploads.f0,requestAndFileUploads.f1,isConnectionProblemOrServiceUnavailable(),(receiver, error) -> {//异常处理});//...//删除本地生成的jobGraph文件Files.delete(jobGraphFile);
}
RestClusterClient.sendRetriableRequest()为请求发送的方法,最终RestClient客户端通过向Yarn中Flink集群发送请求,完成Flink的客户端调度并开启了Flink的集群端调度。
RestClusterClient.sendRetriableRequest()方法源码:
private <M extends MessageHeaders<R, P, U>,U extends MessageParameters,R extends RequestBody,P extends ResponseBody>CompletableFuture<P> sendRetriableRequest(M messageHeaders,U messageParameters,R request,Collection<FileUpload> filesToUpload,Predicate<Throwable> retryPredicate,BiConsumer<String, Throwable> consumer) {//...restClient.sendRequest(webMonitorBaseUrl.getHost(),webMonitorBaseUrl.getPort(),headers,messageParameters,request,filesToUpload);}
至此,JobGraph生成及Flink客户端调度部分已完成,JobGraph生成的完整代码解析和完整的JobGraph结构如下:
完整代码解析:
完整JobGraph结构:
5.结语
本文及上篇博文《Flink-1.19.0源码详解5-JobGraph生成-前篇》已完整解析了JobGraph的生成源码,Flink客户端在JobGraph生成后,把调度请求和程序Jar包及依赖向Yarn中的Flink集群提交,完成Flink客户端调度,下篇博文将开始解析Flink集群端调度,继续解析ExecutionGraph生成与Task执行的源码。