【Spark2.0源码学习】-10.Task执行与回馈

时间:2022-09-08 23:14:03
     通过上一节内容,DriverEndpoint最终生成多个可执行的TaskDescription对象,并向各个ExecutorEndpoint发送LaunchTask指令,本节内容将关注ExecutorEndpoint如何处理LaunchTask指令,处理完成后如何回馈给DriverEndpoint,以及整个job最终如何多次调度直至结束。
 
一、Task的执行流程
     承接上一节内容,Executor接受LaunchTask指令后,开启一个新线程TaskRunner解析RDD,并调用RDD的compute方法,归并函数得到最终任务执行结果
     【Spark2.0源码学习】-10.Task执行与回馈【Spark2.0源码学习】-10.Task执行与回馈
  • ExecutorEndpoint接受到LaunchTask指令后,解码出TaskDescription,调用Executor的launchTask方法
  • Executor创建一个TaskRunner线程,并启动线程,同时将改线程添加到Executor的成员对象中,代码如下:
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
runningTasks.put(taskDescription.taskId, taskRunner)
  • TaskRunner
    • 首先向DriverEndpoint发送任务最新状态为RUNNING
    • 从TaskDescription解析出Task,并调用Task的run方法
  • Task
    • 创建TaskContext以及CallerContext(与HDFS交互的上下文对象)
    • 执行Task的runTask方法
      • 如果Task实例为ShuffleMapTask:解析出RDD以及ShuffleDependency信息,调用RDD的compute()方法将结果写Writer中(Writer这里不介绍,可以作为黑盒理解,比如写入一个文件中),返回MapStatus对象
      • 如果Task实例为ResultTask:解析出RDD以及合并函数信息,调用函数将调用后的结果返回
  • TaskRunner将Task执行的结果序列化,再次向DriverEndpoint发送任务最新状态为FINISHED
 
二、Task的回馈流程
     TaskRunner执行结束后,都将执行状态发送至DriverEndpoint,DriverEndpoint最终反馈指令CompletionEvent至DAGSchedulerEventProcessLoop中
     【Spark2.0源码学习】-10.Task执行与回馈【Spark2.0源码学习】-10.Task执行与回馈
  • DriverEndpoint接受到StatusUpdate消息后,调用TaskScheduler的statusUpdate(taskId, state, result)方法
  • TaskScheduler如果任务结果是完成,那么清除该任务处理中的状态,并调动TaskResultGetter相关方法,关键代码如下:
val taskSet = taskIdToTaskSetManager.get(tid)

taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { executorId =>
executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
}
taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
  • TaskResultGetter启动线程启动线程【task-result-getter】进行相关处理
    • 通过解析或者远程获取得到Task的TaskResult对象
    • 调用TaskSet的handleSuccessfulTask方法,TaskSet的handleSuccessfulTask方法直接调用TaskSetManager的handleSuccessfulTask方法
  • TaskSetManager
    • 更新内部TaskInfo对象状态,并将该Task从运行中Task的集合删除,代码如下:
val info = taskInfos(tid)
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
    • 调用DAGScheduler的taskEnded方法,关键代码如下:
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
  • DAGScheduler向DAGSchedulerEventProcessLoop存入CompletionEvent指令,CompletionEvent对象定义如下
private[scheduler] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo)
extends DAGSchedulerEvent
 
三、Task的迭代流程
     DAGSchedulerEventProcessLoop中针对于CompletionEvent指令,调用DAGScheduler进行处理,DAGScheduler更新Stage与该Task的关系状态,如果Stage下Task都返回,则做下一层Stage的任务拆解与运算工作,直至Job被执行完毕
  【Spark2.0源码学习】-10.Task执行与回馈
【Spark2.0源码学习】-10.Task执行与回馈
  • DAGSchedulerEventProcessLoop接收到CompletionEvent指令后,调用DAGScheduler的handleTaskCompletion方法
  • DAGScheduler根据Task的类型分别处理
  • 如果Task为ShuffleMapTask
    • 待回馈的Partitions减取当前partitionId
    • 如果所有task都返回,则markStageAsFinished(shuffleStage),同时向MapOutputTrackerMaster注册MapOutputs信息,且markMapStageJobAsFinished
    • 调用submitWaitingChildStages(shuffleStage)进行下层Stages的处理,从而迭代处理最终处理到ResultTask,job结束,关键代码如下:
private def submitWaitingChildStages(parent: Stage) {
...
val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
waitingStages --= childStages
for (stage <- childStages.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
  • 如果Task为ResultTask
    • 改job的partitions都已返回,则markStageAsFinished(resultStage),并cleanupStateForJobAndIndependentStages(job),关键代码如下
for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
runningStages -= stage
}
for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
shuffleIdToMapStage.remove(k)
}
if (waitingStages.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
waitingStages -= stage
}
if (failedStages.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
failedStages -= stage
}
}
// data structures based on StageId
stageIdToStage -= stageId
jobIdToStageIds -= job.jobId
jobIdToActiveJob -= job.jobId
activeJobs -= job
     至此,用户编写的代码最终调用Spark分布式计算完毕。

【Spark2.0源码学习】-10.Task执行与回馈的更多相关文章

  1. 【Spark2&period;0源码学习】-1&period;概述

          Spark作为当前主流的分布式计算框架,其高效性.通用性.易用性使其得到广泛的关注,本系列博客不会介绍其原理.安装与使用相关知识,将会从源码角度进行深度分析,理解其背后的设计精髓,以便后续 ...

  2. spark2&period;0源码学习

    [Spark2.0源码学习]-1.概述 [Spark2.0源码学习]-2.一切从脚本说起 [Spark2.0源码学习]-3.Endpoint模型介绍 [Spark2.0源码学习]-4.Master启动 ...

  3. 【Spark2&period;0源码学习】-2&period;一切从脚本说起

    从脚本说起      在看源码之前,我们一般会看相关脚本了解其初始化信息以及Bootstrap类,Spark也不例外,而Spark我们启动三端使用的脚本如下: %SPARK_HOME%/sbin/st ...

  4. 【Spark2&period;0源码学习】-3&period;Endpoint模型介绍

         Spark作为分布式计算框架,多个节点的设计与相互通信模式是其重要的组成部分.   一.组件概览      对源码分析,对于设计思路理解如下:            RpcEndpoint: ...

  5. 【Spark2&period;0源码学习】-9&period;Job提交与Task的拆分

          在前面的章节Client的加载中,Spark的DriverRunner已开始执行用户任务类(比如:org.apache.spark.examples.SparkPi),下面我们开始针对于用 ...

  6. 【Spark2&period;0源码学习】-6&period;Client启动

    Client作为Endpoint的具体实例,下面我们介绍一下Client启动以及OnStart指令后的额外工作 一.脚本概览      下面是一个举例: /opt/jdk1..0_79/bin/jav ...

  7. 【Spark2&period;0源码学习】-4&period;Master启动

         Master作为Endpoint的具体实例,下面我们介绍一下Master启动以及OnStart指令后的相关工作   一.脚本概览      下面是一个举例: /opt/jdk1..0_79/ ...

  8. 【Spark2&period;0源码学习】-5&period;Worker启动

         Worker作为Endpoint的具体实例,下面我们介绍一下Worker启动以及OnStart指令后的额外工作   一.脚本概览      下面是一个举例: /opt/jdk1..0_79/ ...

  9. 【Spark2&period;0源码学习】-7&period;Driver与DriverRunner

         承接上一节内容,Client向Master发起RequestSubmitDriver请求,Master将DriverInfo添加待调度列表中(waitingDrivers),下面针对于Dri ...

随机推荐

  1. 1Z0-053 争议题目解析481

    1Z0-053 争议题目解析481 考试科目:1Z0-053 题库版本:V13.02 题库中原题为: 481.Which statement is true about a running sessi ...

  2. Composer 学习笔记

    Composer 是 PHP 的一个依赖管理工具(据说是最优秀的).它允许你申明项目所依赖的代码库,它会在你的项目中为你安装他们.目前很多项目以Composer 作为依赖管理工具,目前正在学习yii2 ...

  3. 使用php技术实现无刷新的上传文件

  4. 武林&lbrack;HDU1107&rsqb;

    武林 Time Limit: 10000/5000 MS (Java/Others)    Memory Limit: 65536/32768 K (Java/Others)Total Submiss ...

  5. java利用过滤器实现编码的转换&comma;内容输出的替换

    在页面建个表单 <form action="login.do" method="post"> <input type="text&q ...

  6. &lbrack;Xcode使用 - 4&rsqb; 真机调试配置

    A.步骤 1.申请开发者账号 2.注册机器成为开发机器 (1)生成证书请求文件 (2)上传请求文件 (3)生成cer证书文件: **.cer (4)下载cer证书   3.注册测试程序ID:APP I ...

  7. 测试开发Python培训:实现屌丝的黄色图片收藏愿望(小插曲)

    男学员在学习python的自动化过程中对于爬虫很感兴趣,有些学员就想能收藏一些情色图片,供自己欣赏.作为讲师只能是满足愿望,帮助大家实现对美的追求,http://wanimal.lofter.com/ ...

  8. 201521123107 《Java程序设计》第10周学习总结

    第10周作业-异常与多线程 1.本周学习总结 2.书面作业 1.finally 题目4-2 1.1 截图你的提交结果(出现学号) 1.2 4-2中finally中捕获异常需要注意什么? 只有try块中 ...

  9. hbase数据原理及基本架构

    第一:hbase介绍 hbase是一个构建在hdfs上的分布式列存储系统: hbase是apache hadoop生态系统中的重要一员,主要用于海量结构化数据存储 从逻辑上讲,hbase将数据按照表. ...

  10. javascript的防篡改对象之preventExtensions&lpar;&rpar;方法

    js在默认情况下,所有的对象都是可扩展的.这也是让很多开发人员头特疼的问题.因为在同一环境中,一不小心就会发生修改了不必要的对象,而自己却不知道. 在ECMAScript5可以解决这种问题了. pre ...