当前位置: 首页 > news >正文

Flink源码之TaskManager启动流程

从启动命令flink-daemon.sh可以看出TaskManger入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner

TaskManagerRunner::main
TaskManagerRunner::runTaskManagerProcessSecurely
TaskManagerRunner::runTaskManager //构造TaskManagerRunner并调用start()方法
TaskManagerRunner::new //核心

在TaskManagerRunner构造函数中,可以看出与JobManger类似,也是先构造出一些公共服务:

highAvailabilityServices//用于获取JobManger的地址
rpcService //将TaskExecutor包装为AkkaActor提供RPC服务
heartbeatServices //心跳服务,与JobManger通信
metricRegistry //metric服务,提供metric注册和查询
blobCacheService //缓存Blob

这些服务在构造TaskExecutor时作为构造函数参数传入

构造TaskExecutor前会先构造TaskManagerServices辅助TaskExecutor实现其核心功能

TaskManagerRunner::createTaskExecutorService
TaskManagerRunner::startTaskManager // 构造MetricGroup和相关服务
TaskManagerServices.fromConfiguration//读取TaskManger的配置信息启动TaskManager相关服务
TaskExecutor::new  //核心

启动TaskEexector后会与ResouceManager建立连接,将自身信息注册到RM后发送Slot报告给RM,具体调用链路如下:

TaskManagerRunner::start
TaskExecutorToServiceAdapter::start
TaskExecutor::start
TaskExecutor::onStart
TaskExecutor::startTaskExecutorServices //获取ResourceManager地址后与ResourceManager建立连接,发送Slot报告
ResourceManagerLeaderListener::notifyLeaderAddress
TaskExecutor::notifyOfNewResourceManagerLeader
TaskExecutor::reconnectToResourceManager
TaskExecutor::tryConnectToResourceManager
TaskExecutor::connectToResourceManager
TaskExecutorToResourceManagerConnection::start
RegisteredRpcConnection::start
RegisteredRpcConnection::createNewRegistration
TaskExecutorToResourceManagerConnection::generateRegistration
RetryingRegistration::startRegistration //与resourcemanager建立连接
RetryingRegistration::register
ResourceManagerRegistration::invokeRegistration //向ResourceManager注册TaskExecutorRegistration信息
ResourceManagerGateway.registerTaskExecutor
TaskExecutorToResourceManagerConnection::onRegistrationSuccess
ResourceManagerRegistrationListener::onRegistrationSuccess
TaskExecutor::establishResourceManagerConnection ResourceManagerGateway.sendSlotReport //发送自身slot信息给ResourceManagerHeartbeatManagerImpl::monitorTarget//与RM建立心跳连接,当接到来自RM的心跳请求时,就会将SlotReport发送给RM作为心跳回应

TaskExecutor提供了以下两个核心方法:

 //RM将Slot分配给JobMaster请求TM将具体Slot信息发送给JobMasterCompletableFuture<Acknowledge> requestSlot(SlotID slotId,JobID jobId,AllocationID allocationId,ResourceProfile resourceProfile,String targetAddress,ResourceManagerId resourceManagerId,@RpcTimeout Time timeout);//执行JobMaster提交的物理Task       
CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout);    

TaskManager中管理Slot的实现类TaskSlotTableImpl,该实例记录了Slot的分配信息。

在这里插入图片描述

HeartBeat

在TaskExecutor构造函数中有两个HeartbeatManager,实现类都是HeartbeatManagerImpl,此类是接受心跳请求,发送心跳响应:

ResourceManagerHeartbeatManager //响应RM的心跳请求,心跳响应中带上SlotReport
JobManagerHeartbeatManager  //响应JobMaster的心跳请求, 心跳响应中带上AccumulatorReport

调用HeartbeatManagerImpl.monitorTarget(ResourceID resourceID, HeartbeatTarget heartbeatTarget) 与目标对象建立心跳连接。

HeartbeatManager还有个实现类是HeartbeatManagerSenderImpl,用于主动向监控目标发送心跳请求,比如在ResourceManager中创建的就是HeartbeatManagerSenderImpl,TaskManager启动时向ResourceManager注册后,RM就会调用HeartbeatManagerSenderImpl.monitor监控TM, 并定时向TM的HeartbeatManagerImpl发送心跳请求。同样,在JobMaster中创建的也是HeartbeatManagerSenderImpl,JobMaster定时向执行当前Job的TM发送心跳请求,TM响应与该Job相关信息。

综上,TM启动后向RM注册,与TM通过心跳信息同步Slot分配状况,接受RM的Slot分配请求向JobMaster提供Slot后,就可以接受JobMaster 执行具体的物理Task了。

http://www.lryc.cn/news/123801.html

相关文章:

  • 加入微软MCPP有什么优势?
  • leetcode做题笔记78子集
  • Skywalking-9.6.0系列之本地源码编译并启动
  • proteus结合keil-arm编译器构建STM32单片机项目进行仿真
  • 第五十三天
  • gorm基本操作
  • 华为OD机试 - 排队游戏(Java JS Python)
  • 滚动条样式更改
  • 掌握Python的X篇_33_MATLAB的替代组合NumPy+SciPy+Matplotlib
  • Python解决-力扣002-两数相加
  • nginx基于源码安装的方式对静态页面、虚拟主机(IP、端口、域名)和日志文件进行配置
  • [FPAG开发]使用Vivado创建第一个程序
  • 使用 Python 在 NLP 中进行文本预处理
  • [足式机器人]Part3机构运动微分几何学分析与综合Ch03-1 空间约束曲线与约束曲面微分几何学——【读书笔记】
  • pytest框架快速进阶篇-pytest前置和pytest后置,skipif跳过用例
  • Python 基础语法 | 常量表达式,变量,注释,输入输出
  • SQL | 分组数据
  • 软件测试技术之如何编写测试用例(6)
  • 论文阅读——Adversarial Eigen Attack on Black-Box Models
  • 自然语言处理从入门到应用——LangChain:记忆(Memory)-[自定义对话记忆与自定义记忆类]
  • 【C/C++】STL queue 非线程安全接口,危险!
  • 执行Lua脚本后一直查询不到Redis中的数据(附带问题详细排查过程,一波三折)
  • [高光谱]PyTorch使用CNN对高光谱图像进行分类
  • jmeter获取mysql数据
  • Dedecms V110最新版RCE---Tricks
  • CTFshow 限时活动 红包挑战7、红包挑战8
  • Redis使用Lua脚本和Redisson来保证库存扣减中的原子性和一致性
  • 【从零开始学Kaggle竞赛】泰坦尼克之灾
  • 输出无重复的3位数和计算无人机飞行坐标
  • muduo 29 异步日志