Spark 运行流程核心组件(三)任务执行
一、启动模式
1、standalone
- 资源申请:Driver向Master申请Executor资源
- Executor启动:Master调度Worker启动Executor
- 注册通信:Executor直接向Driver注册
2、YARN
-
Driver向YARN ResourceManager(RM)申请AM容器
-
RM分配NodeManager(NM)启动AM(yarn-client 仅资源代理,不运行用户代码)
-
AM向RM注册
-
AM根据申请Executor容器
5.RM分配多个NM
6.每个NM启动ExecutorBackend进程
**7.**注册通信:Executor向AM内的Driver注册
二、Executor端任务执行的核心组件
- Driver 端组件
- CoarseGrainedSchedulerBackend:负责与Executor通信
- TaskSchedulerImpl:任务调度核心逻辑
- DAGScheduler:DAG调度与Stage管理
- BlockManagerMaster:块管理器协调器
- MapOutputTrackerMaster:Shuffle输出跟踪器
- Executor 端组件
- CoarseGrainedExecutorBackend:Executor的通信端点
- Executor:任务执行引擎
- TaskRunner:任务执行线程封装
- BlockManager:本地数据块管理
- ShuffleManager:Shuffle读写控制
- ExecutorSource:指标监控
三、Executor 端任务执行核心流程
1、任务接收与初始化
- CoarseGrainedExecutorBackend 接收任务
case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}")executor.launchTask(this, taskDesc)}
Executor
任务启动
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val taskId = taskDescription.taskIdval tr = createTaskRunner(context, taskDescription)runningTasks.put(taskId, tr)val killMark = killMarks.get(taskId)if (killMark != null) {tr.kill(killMark._1, killMark._2)killMarks.remove(taskId)}threadPool.execute(tr)if (decommissioned) {log.error(s"Launching a task while in decommissioned state.")}}
2、任务执行
TaskRunner.run
// 1. 类加载与依赖管理
updateDependencies(taskDescription.artifacts.files,taskDescription.artifacts.jars,taskDescription.artifacts.archives,isolatedSession)
// 2. 反序列化任务
task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
// 3. 内存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
task.setTaskMemoryManager(taskMemoryManager)// 4. 任务执行
val value = Utils.tryWithSafeFinally {val res = task.run(taskAttemptId = taskId,attemptNumber = taskDescription.attemptNumber,metricsSystem = env.metricsSystem,cpus = taskDescription.cpus,resources = resources,plugins = plugins)threwException = falseres} {// block 释放val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)// memory 释放val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()if (freedMemory > 0 && !threwException) {val errMsg = log"Managed memory leak detected; size = " +log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logInfo(errMsg)}}}// 5. 状态上报
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
- Task run
// hadoop.caller.context.enabled = true
// 添加 HDFS 审计日志 , 用于问题排查 。
// e.g. 小文件剧增 定位spark 任务
new CallerContext("TASK",SparkEnv.get.conf.get(APP_CALLER_CONTEXT),appId,appAttemptId,jobId,Option(stageId),Option(stageAttemptId),Option(taskAttemptId),Option(attemptNumber)).setCurrentContext()// 任务启动
context.runTaskWithListeners(this)
3、shuffle处理
ShuffleMapTask
为下游 Stage 准备 Shuffle 数据(Map 端输出),生成 MapStatus
(包含数据位置和大小信息)。
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 从广播 序列化 rdd 、 dep
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0Lval rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {partitionId
} else {context.taskAttemptId()
}
dep.shuffleWriterProcessor.write(rdd.iterator(partition, context),dep,mapId,partitionId,context)
ResultTask
override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val threadMXBean = ManagementFactory.getThreadMXBeanval deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lfunc(context, rdd.iterator(partition, context))
}
四、核心通信机制
消息类型 | 方向 | 内容 |
---|---|---|
RegisterExecutor | Executor→Driver | executorId, hostPort |
RegisteredExecutor | Driver→Executor | 注册成功确认 |
LaunchTask | Driver→Executor | 序列化的TaskDescription |
StatusUpdate | Executor→Driver | taskId, state, result data |
KillTask | Driver→Executor | 终止指定任务 |
StopExecutor | Driver→Executor | 关闭Executor指令 |
Heartbeat | Executor→Driver | 心跳+指标数据 |
五、Executor 线程模型
Executor JVM Process
├── CoarseGrainedExecutorBackend (netty)
├── ThreadPool (CacheThreadPool)
│ ├── TaskRunner 1
│ ├── TaskRunner 2
│ └── ...
├── BlockManager
│ ├── MemoryStore (on-heap/off-heap)
│ └── DiskStore
└── ShuffleManager├── SortShuffleWriter└── UnsafeShuffleWriter