当前位置: 首页 > news >正文

Spark 运行流程核心组件(一)作业提交

1、Job启动流程

在这里插入图片描述

1、Client触发 SparkContext 初始化

2、SparkContextMaster 注册应用

3、Master 调度 Worker 启动 Executor

4、Worker 进程启动 Executor

5、DAGScheduler 将作业分解为 Stage

6、TaskScheduler 分配 TaskExecutor

2、核心组件

组件职责
SparkContext应用入口,协调各组件,管理应用生命周期。
DAGScheduler将 Job 拆分为 Stage,构建 DAG,提交 TaskSet 给 TaskScheduler。
TaskScheduler调度 Task 到 Executor,处理故障重试。
CoarseGrainedSchedulerBackend与集群管理器交互,申请资源,管理 Executor。
ExternalClusterManager抽象层,适配不同集群(Standalone/YARN/Mesos)。
Master & WorkerStandalone 模式下管理集群资源(Master 分配资源,Worker 启动 Executor)。
Executor在 Worker 上运行,执行 Task,管理内存/磁盘。
CoarseGrainedExecutorBackendExecutor 的通信代理,接收 Task,返回状态/结果。
Task计算单元(ShuffleMapTask / ResultTask)。
ShuffleManager管理 Shuffle 数据读写(如 SortShuffleManager)。

3、工作流程

1、SparkContext

负责资源申请、任务提交、与集群管理器通信。

调用runJob方法,将RDD操作传递给DAGScheduler

2、DAGScheduler

将Job拆分为Stage(DAG),处理Shuffle依赖,提交TaskSet给TaskScheduler。

1、DAGSchedulerEvent

/* 作业生命周期事件 */
JobSubmitted //新作业提交时触发
JobCancelled //单个作业被取消
JobGroupCancelled //作业组整体取消
JobTagCancelled //按标签批量取消作业
AllJobsCancelled //取消所有运行中的作业/* 阶段执行事件 */
MapStageSubmitted //Shuffle Map阶段提交
StageCancelled //单个阶段取消
StageFailed //阶段执行失败
ResubmitFailedStages //自动重试失败阶段 ,默认4次/* 任务调度事件 */
TaskSetFailed //整个任务集失败,默认4次
SpeculativeTaskSubmitted //启动推测执行任务
UnschedulableTaskSetAdded //任务集进入待调度队列
UnschedulableTaskSetRemoved //任务集离开待调度队列/* Shuffle 优化事件 */
RegisterMergeStatuses //注册Shuffle合并状态
ShuffleMergeFinalized //Shuffle合并完成
ShufflePushCompleted //Shuffle数据推送完成/* 资源管理事件 */
ExecutorAdded //新Executor注册成功
ExecutorLost //Executor异常丢失
WorkerRemoved //工作节点移除/* 执行过程事件 */
BeginEvent //任务集开始执行 
GettingResultEvent //驱动程序主动获取任务结果
CompletionEvent //作业/阶段完成

2、stage拆分流程

*ResultStage (执行作的最后一个阶段)、ShuffleMapStage (shuffle映射输出文件)*

  1. 用户行动操作触发submitJob,发送JobSubmitted事件。
  2. handleJobSubmitted处理事件,调用createResultStage创建ResultStage。
  3. createResultStage调用getOrCreateParentStages获取父Stage,父Stage的创建会递归进行。
  4. 在创建父Stage的过程中,遇到宽依赖则创建ShuffleMapStage,并递归创建其父Stage。
  5. 当所有父Stage都创建完成后,回到handleJobSubmitted,调用submitStage提交ResultStage。
  6. submitStage检查父Stage是否完成,如果有未完成的父Stage,则递归提交父Stage;否则,提交当前Stage(调用submitMissingTasks)。
  7. submitMissingTasks为Stage创建任务(ShuffleMapTask或ResultTask),并提交给TaskScheduler执行。

3、宽窄依赖切分

private def stageDependsOn(stage: Stage, target: Stage): Boolean = {if (stage == target) {return true}// DFS遍历RDD依赖树val visitedRdds = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new ListBuffer[RDD[_]]waitingForVisit += stage.rdddef visit(rdd: RDD[_]): Unit = {if (!visitedRdds(rdd)) {visitedRdds += rddfor (dep <- rdd.dependencies) {dep match {// 宽依赖:创建新的ShuffleMapStagecase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {waitingForVisit.prepend(mapStage.rdd)}  // Otherwise there's no need to follow the dependency back// 窄依赖:继续回溯case narrowDep: NarrowDependency[_] =>waitingForVisit.prepend(narrowDep.rdd)}}}}while (waitingForVisit.nonEmpty) {visit(waitingForVisit.remove(0))}visitedRdds.contains(target.rdd)}

3、TaskScheduler

接收TaskSet,按调度策略(FIFO/FAIR)将Task分配给Executor。

1、执行流程

1、DAGScheduler 调用 taskScheduler.submitTasks() 后,任务进入 TaskScheduler 调度阶段

2、任务提交submitTasks

// TaskSetManager管理任务集
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 添加到调度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 触发资源分配
backend.reviveOffers()

3、资源分配 (Driver)

// CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {driverEndpoint.send(ReviveOffers)  // 向DriverEndpoint发送消息
}// DriverEndpoint处理
case ReviveOffers =>makeOffers()  // 触发资源分配

4、资源分配核心

private def makeOffers(): Unit = {// Make sure no executor is killed while some task is launching on itval taskDescs = withLock {// 1. 获取所有可用Executor资源val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }val workOffers = activeExecutors.map {case (id, executorData) => buildWorkerOffer(id, executorData)}.toIndexedSeq// 2. 调用任务调度器分配任务scheduler.resourceOffers(workOffers, true)}// 3. 启动分配的任务if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}

5、任务启动

// CoarseGrainedSchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {// 1. 序列化任务val serializedTask = TaskDescription.encode(task)// 2. 检查任务大小if (serializedTask.limit() >= maxRpcMessageSize) {Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)// Do resources allocation here. The allocated resources will get released after the task// finishes.executorData.freeCores -= task.cpustask.resources.foreach { case (rName, addressAmounts) =>executorData.resourcesInfo(rName).acquire(addressAmounts)}logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")// 发送任务到ExecutorexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}
http://www.lryc.cn/news/618251.html

相关文章:

  • 数据量暴涨时,抓取架构该如何应对?
  • 开发npm包【详细教程】
  • Bevy渲染引擎核心技术深度解析:架构、体积雾与Meshlet渲染
  • C++Linux八股
  • 08--深入解析C++ list:高效操作与实现原理
  • K8S 节点初始化一键脚本(禁用 SELinux + 关闭 swap + 开启 ipvs 亲测实用)
  • 微前端架构:原理、场景与实践案例
  • 前端JS处理时间,适用于聊天、操作记录等(包含刚刚、x分钟前、x小时前、x天前)
  • Windows已经安装了一个MySQL8,通过修改配置文件的端口号跑2个或多个Mysql服务方法,并注册为系统服务
  • lesson36:MySQL从入门到精通:全面掌握数据库操作与核心原理
  • 嵌入式系统学习Day17(文件编程)
  • 项目实战2——LAMP_LNMP实践
  • 智能化评估体系:数据生产、在线化与自动化的三重奏
  • 解锁 Appium Inspector:移动端 UI 自动化定位的利器
  • 【论文阅读】一种基于经典机器学习的肌电下肢意图检测方法,用于人机交互系统
  • Secure CRT做代理转发
  • 【element树组件】el-tree实现连接线及hover编辑效果
  • ip归属地批量查询脚本
  • 视频输入输出模块介绍和示例
  • 【Node.js从 0 到 1:入门实战与项目驱动】2.1 安装 Node.js 与 npm(Windows/macOS/Linux 系统的安装步骤)
  • history命令增强记录执行时间与登录IP
  • 线性代数 · 矩阵 | 最小多项式
  • 【debug 解决 记录】stm32 debug模式的时候可以运行,但是烧录没法执行
  • Mac如何安装telnet命令
  • 论答题pk小程序软件版权的
  • 家政小程序系统开发:推动家政行业数字化转型,共创美好未来
  • 校园快递小程序(腾讯地图API、二维码识别、Echarts图形化分析)
  • 基于开源AI大模型AI智能名片S2B2C商城小程序的母婴用品精准营销策略研究
  • 思科、华为、华三如何切换三层端口?
  • Web前端小游戏轮盘。