spark 启动job的流程分析

时间:2023-03-09 02:20:18
spark 启动job的流程分析

从WordCount開始分析

编写一个样例程序

编写一个从HDFS中读取并计算wordcount的样例程序:

packageorg.apache.spark.examples

importorg.apache.spark.SparkContext

importorg.apache.spark.SparkContext._

objectWordCount{

defmain(args : Array[String]) {

valsc
= ),"wordcount by hdfs",

System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass()))

//从hadoophdfs的根路径下得到一个文件

valfile
=sc.textFile("/hadoop-test.txt")

valcounts
=file.flatMap(line=> line.split(" "))

)).reduceByKey(_+ _)

counts.saveAsTextFile("/newtest.txt")

}

}

生成SparkContext实例

在上面样例中。要运行map/reduce操作。首先须要一个SparkContext。因此看看SparkContext的实例生成

defthis(

master: String,

appName: String,

sparkHome: String =
null,

jars: Seq[String] = Nil,

environment: Map[String, String]= Map(),

preferredNodeLocationData:Map[String, Set[SplitInfo]] = Map()) =

{

this(SparkContext.updatedConf(newSparkConf(),
master, appName, sparkHome, jars, environment),

preferredNodeLocationData)

}

编写WordCount样例时使用了上面列出的构造函数,后面两个environment与preferredNodeLocationData传入为默认值。

调用updatedConf的单例函数,生成或更新当前的SparkConf实例。

调用SparkContext的默认构造函数。

1.生成并启动监控的Jettyui,SparkUI.

2.生成TaskScheduler实例,并启动。

此函数会依据不同的mastername生成不同的TaskScheduler实例。,yarn-cluster为YarnClusterScheduler。

主要用来启动/停止task,监控task的执行状态。

private[spark]vartaskScheduler=
SparkContext.createTaskScheduler(this,master,appName)

taskScheduler.start()

3.生成DAGScheduler实例,并启动。

@volatileprivate[spark]vardagScheduler=
newDAGScheduler(taskScheduler)

dagScheduler.start()

在scheduler进行start操作后,通过调用postStartHook来把SparkContext加入到appmaster中。

生成WorkerRunnable线程。通过nmclient启动worker相应的container。此container线程CoarseGrainedExecutorBackend的实例,此实例通过Executor实例来载入相关的task。

SparkContext.textFile生成RDD

此方法用来生成RDD的实例,通常读取文本文件的方式通过textFile来进行,并其调用hadoopFile来运行。

通过hadoopFile得到一个HadoopRDD<K,V>的实例后,通过.map得到V的值。并生成RDD返回。

deftextFile(path: String, minSplits: Int = defaultMinSplits):RDD[String]
= {

hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text],

minSplits).map(pair=> pair._2.toString)

}

终于通过hadoopFile函数生成一个HadoopRDD实例。

defhadoopFile[K, V](

path: String,

inputFormatClass: Class[_ <:InputFormat[K, V]],

keyClass: Class[K],

valueClass: Class[V],

minSplits: Int = defaultMinSplits

): RDD[(K, V)] = {

//A
Hadoopconfiguration can be about 10 KB, which is pretty big, so broadcastit.

valconfBroadcast=
broadcast(newSerializableWritable(hadoopConfiguration))

valsetInputPathsFunc=
(jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf,path)

newHadoopRDD(

this,

confBroadcast,

Some(setInputPathsFunc),

inputFormatClass,

keyClass,

valueClass,

minSplits)

}

RDD函数的抽象运行

reduceByKey须要执行shuffle的reduce。也就是须要多个map中的数据集合到同样的reduce中执行,生成相关的DAG任务

valfile
=sc.textFile("/hadoop-test.txt")

valcounts
=file.flatMap(line=> line.split(" "))

)).reduceByKey(_+ _)

counts.saveAsTextFile("/newtest.txt")

在以上代码中,textFile,flatMap,map,reduceByKey都是spark中RDD的transformation,

而saveAsTextFile才是RDD中进行运行操作的action.

下面引用http://my-oschina-net/hanzhankang/blog/200275的相关说明:

详细可參见:http://spark.apache.org/docs/0.9.0/scala-programming-guide.html。

,transformation是得到一个新的RDD,方式非常多。比方从数据源生成一个新的RDD,从RDD生成一个新的RDD

,action是得到一个值,或者一个结果(直接将RDDcache到内存中)

全部的transformation都是採用的懒策略。就是假设仅仅是将transformation提交是不会运行计算的。计算仅仅有在action被提交的时候才被触发。

transformation操作:

map(func):对调用map的RDD数据集中的每一个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

filter(func):
对调用filter的RDD数据集中的每一个元素都使用func,然后返回一个包括使func为true的元素构成的RDD

flatMap(func):和map差点儿相同。可是flatMap生成的是多个结果

mapPartitions(func):和map非常像,可是map是每一个element。而mapPartitions是每一个partition

mapPartitionsWithSplit(func):和mapPartitions非常像,可是func作用的是当中一个split上,所以func中应该有index

sample(withReplacement,faction,seed):抽样

union(otherDataset):返回一个新的dataset,包括源dataset和给定dataset的元素的集合

distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element

groupByKey(numTasks):返回(K,Seq[V])。也就是hadoop中reduce函数接受的key-valuelist

reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比方求和。求平均数

sortByKey([ascending],[numTasks]):依照key来进行排序,是升序还是降序。ascending是boolean类型

join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W)。返回的是(K,(V,W))的dataset,numTasks为并发的任务数

cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数

cartesian(otherDataset):笛卡尔积就是m*n,大家懂的

action操作:

reduce(func):说白了就是聚集,可是传入的函数是两个參数输入返回一个值。这个函数必须是满足交换律和结合律的

collect():一般在filter或者足够小的结果的时候。再用collect封装返回一个数组

count():返回的是dataset中的element的个数

first():返回的是dataset中的第一个元素

take(n):返回前n个elements。这个士driverprogram返回的

takeSample(withReplacement。num,seed):抽样返回一个dataset中的num个元素,随机种子seed

saveAsTextFile(path):把dataset写到一个textfile中,或者hdfs。或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录。然后写到file中

saveAsSequenceFile(path):仅仅能用在key-value对上。然后生成SequenceFile写到本地或者hadoop文件系统

countByKey():返回的是key相应的个数的一个map,作用于一个RDD

foreach(func):对dataset中的每一个元素都使用func

RDD的action中提交Job

在运行RDD的saveAsTextFile时调用SparkContext.runJob方法

saveAsTextFile方法,-->saveAsHadoopFile,终于调用SparkContext.runJob方法

defsaveAsTextFile(path: String) {

this.map(x=> (NullWritable.get(),
newText(x.toString)))

.saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)

}

......下面一行代码就是在saveASTextFile函数嵌套调用中终于调用的函数,调用SparkContext.runJob

self.context.runJob(self,writeToFile _)

SparkContext.runJob的定义:

defrunJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T])=> U):
Array[U] = {

runJob(rdd, func,
0until
rdd.partitions.size,false)

}

SparkContext的终于运行runJob函数定义

defrunJob[T, U: ClassTag](

rdd: RDD[T],//此处是详细的RDD实例值

func: (TaskContext, Iterator[T])=> U,//详细的运行的action的逻辑,如reduceByKey

到partitions.size-1

allowLocal: Boolean,//能否够在本地运行

//result的处理逻辑,每个Task的处理

resultHandler: (Int, U)
=>Unit) {

valcallSite
=getCallSite

valcleanedFunc=
clean(func)

logInfo("Startingjob: " +
callSite)

valstart
=System.nanoTime

通过DAGScheduler.runJob去执行job的执行操作。请看以下的DAGScheduler处理job提交。

dagScheduler.runJob(rdd,cleanedFunc,partitions,
callSite,allowLocal,

resultHandler,localProperties.get)

logInfo("Jobfinished: " +
callSite+
", took "+ (System.nanoTime -
start)/
1e9 + "s")

rdd.doCheckpoint()

}

DAGScheduler处理job提交

上面的函数终于通过DagScheduler.runJob进行运行。

defrunJob[T, U: ClassTag](

rdd: RDD[T],

func: (TaskContext, Iterator[T])=> U,

partitions: Seq[Int],

callSite: String,

allowLocal: Boolean,

resultHandler: (Int, U) =>Unit,

properties: Properties =
null)

{

valwaiter
=submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler,properties)

等待job执行完毕。

waiter.awaitResult()match{

caseJobSucceeded => {}

caseJobFailed(exception:Exception,
_) =>

logInfo("Failedto run " + callSite)

throwexception

}

}

调用DAGShceduler.submitJob来提交任务。

defsubmitJob[T, U](

rdd: RDD[T],

func: (TaskContext, Iterator[T])=> U,

partitions: Seq[Int],

callSite: String,

allowLocal: Boolean,

resultHandler: (Int, U) =>Unit,

properties: Properties =
null):JobWaiter[U] =

{

//Check to make sure we are not launching a task on a partition thatdoes not exist.

valmaxPartitions=
rdd.partitions.length

partitions.find(p => p >=maxPartitions).foreach{ p =>

thrownewIllegalArgumentException(

"Attemptingto access a non-existent partition: "+ p
+ ". "+

"Totalnumber of partitions: " +maxPartitions)

}

valjobId
=nextJobId.getAndIncrement()

if(partitions.size ==
){

returnnewJobWaiter[U](this,jobId,
,resultHandler)

}

assert(partitions.size >
)

valfunc2
=func.asInstanceOf[(TaskContext, Iterator[_]) => _]

valwaiter
=newJobWaiter(this,jobId,partitions.size,
resultHandler)

向akka的actor发送一个event,此event为JobSubmitted,!表示发送消息

eventProcessActor! JobSubmitted(

jobId,rdd,
func2,partitions.toArray, allowLocal, callSite,
waiter,properties)

waiter

}

在DAGShceduler中的start方法时,会生成例如以下代码,此代码receive接收eventProcessActor发送的消息并进行处理

defstart() {

eventProcessActor= env.actorSystem.actorOf(Props(newActor
{

/**

* A handle to the periodicaltask, used to cancel the task when the actor is stopped.

*/

varresubmissionTask:Cancellable
= _

overridedefpreStart()
{

importcontext.dispatcher

/**

* A message is sent to theactor itself periodically to remind the actor to resubmit failed

* stages. In this way, stageresubmission can be done within the same thread context of

* other event processing logicto avoid unnecessary synchronization overhead.

*/

resubmissionTask=
context.system.scheduler.schedule(

RESUBMIT_TIMEOUT,RESUBMIT_TIMEOUT,self,ResubmitFailedStages)

}

/**

* The main event loop of the DAGscheduler.

*/

接收发送的scheduler事件,并通过processEvent进行处理。

defreceive = {

caseevent:DAGSchedulerEvent
=>

logTrace("Gotevent of type " +event.getClass.getName)

/**

* All events are forwarded to`processEvent()`, so that the event processing logic can

* easily tested withoutstarting a dedicated actor. Please refer to `DAGSchedulerSuite`

* for details.

*/

if(!processEvent(event)){

submitWaitingStages()

}
else{

resubmissionTask.cancel()

context.stop(self)

}

}

}))

}

processEvent中处理JobSubmitted的处理流程:

下面代码中生成一个finalStage,每个JOB都有一个finalStage,依据job划分出不同的stage。而且提交stage:

private[scheduler]defprocessEvent(event:
DAGSchedulerEvent): Boolean = {

event
match{

caseJobSubmitted(jobId,rdd,
func,partitions,allowLocal,callSite,listener,properties)=>

varfinalStage:Stage
= null

try{

//New stage creation may throw an exception if, for example, jobs arerun on a HadoopRDD

//whose underlying HDFS files have been deleted.

finalStage= newStage(rdd,partitions.size,None,
jobId,Some(callSite))

}
catch{

casee:Exception
=>

logWarning("Creatingnew stage failed due to exception - job: "+
jobId,
e)

listener.jobFailed(e)

returnfalse

}

valjob
= newActiveJob(jobId,finalStage,func,partitions,callSite,listener,properties)

clearCacheLocs()

logInfo("Gotjob " +
job.jobId+
" ("+
callSite+ ") with "+
partitions.length+

"output partitions (allowLocal=" +allowLocal+
")")

logInfo("Finalstage: " +
finalStage+
" ("+
finalStage.name+
")")

logInfo("Parentsof final stage: " +finalStage.parents)

logInfo("Missingparents: " +getMissingParentStages(finalStage))

假设能够本地执行,同一时候此finalStage没有stage的依赖关系,同一时候partitions仅仅有一个。

也就是仅仅有一个处理的split

那么这时直接通过localThread的方式来执行此job实例。不通过TaskScheduler进行处理。

if(allowLocal&&
finalStage.parents.size==
&&partitions.length==
) {

//Compute very short actions like first() or take() with no parentstages locally.

listenerBus.post(SparkListenerJobStart(job,Array(),
properties))

runLocally(job)

}
else{

否则表示partitions有多个,或者stage本身的依赖关系,也就是像reduce这样的场景。

依据job相应的stage(finalStage),调用submitStage,通过stage之间的依赖关系得出stageDAG。并以依赖关系进行处理:

idToActiveJob(jobId)=
job

activeJobs+=
job

resultStageToJob(finalStage)=
job

listenerBus.post(SparkListenerJobStart(job,jobIdToStageIds(jobId).toArray,properties))

submitStage(finalStage)

}

submitStage方法处理流程:

privatedef submitStage(stage:
Stage) {

valjobId
=activeJobForStage(stage)

if(jobId.isDefined){

logDebug("submitStage("+ stage +
")")

if(!waiting(stage)&&
!running(stage)&& !failed(stage)){

valmissing
=getMissingParentStages(stage).sortBy(_.id)

logDebug("missing:" +
missing)

if(missing
==Nil) {

logInfo("Submitting" + stage +
"(" + stage.rdd+
"), which has no missingparents")

submitMissingTasks(stage,jobId.get)

running+= stage

}
else{

for(parent
<-missing) {

submitStage(parent)

}

waiting+= stage

}

}

}else{

abortStage(stage,
"Noactive job for stage " + stage.id)

}

}

对于一个刚生成的job,此时的stage为刚生成。

此时submitStage调用getMissingParentStages得到stage的parent,也就是RDD的依赖关系

生成parentStage是通过RDD的dependencies来生成相关的RDD的依赖关系,

假设依赖关系是ShuffleDependency,生成一个mapStage来作为finalStage的parent,

否则是NarrowDependency,不生成新的stage.如count,各task没有相关的数据依赖

也就是说,相应须要运行shuffle操作的job,会生成mapStage与finalStage进行。

而不须要shuffle的job仅仅须要一个finalStage

privatedef getMissingParentStages(stage:Stage):
List[Stage] = {

valmissing
=newHashSet[Stage]

valvisited
=newHashSet[RDD[_]]

defvisit(rdd: RDD[_]) {

if(!visited(rdd)){

visited+= rdd

if(getCacheLocs(rdd).contains(Nil)){

for(dep
<-rdd.dependencies) {

depmatch{

caseshufDep:ShuffleDependency[_,_]
=>

valmapStage
=getShuffleMapStage(shufDep,stage.jobId)

if(!mapStage.isAvailable){

missing+=
mapStage

}

casenarrowDep:NarrowDependency[_]
=>

visit(narrowDep.rdd)

}

}

}

}

}

visit(stage.rdd)

missing.toList

}

接下来回到submitStage方法中,假设stage没有missing的stage时(没有parentstage)。运行task的提交操作。

if (missing== Nil) {

logInfo("Submitting" + stage +
"(" + stage.rdd+
"), which has no missingparents")

submitMissingTasks(stage,jobId.get)

设置当前的stage为running,由于当前的stage没有parent的stage,直接running当前的stage

running+= stage

}else{

for(parent
<-missing) {

此stage中包括有parent的stage,因此stage须要进行顺序运行。

先运行parent的stage.递归调用

submitStage(parent)

}

设置当前的stage为waiting,表示此stage须要等待parent的运行完毕。

waiting+= stage

}

运行submitMissingTasks流程处理,把stage依据partition生成TaskSet,通过TaskScheduler提交Task.

privatedef submitMissingTasks(stage:Stage,
jobId: Int) {

logDebug("submitMissingTasks("+ stage +
")")

//Get our pending tasks and remember them in our pendingTasks entry

valmyPending
=pendingTasks.getOrElseUpdate(stage,newHashSet)

myPending.clear()

vartasks
=ArrayBuffer[Task[_]]()

检查stage是否是mapStage,假设是shuffleMapStage,生成ShuffleMapTask,并加入到tasks列表中。

mapStage表示还有其他stage依赖此stage

if(stage.isShuffleMap){

for(p
<- 0until stage.numPartitionsifstage.outputLocs(p)==
Nil) {

得到rddstage中当前传入的partition的TaskLocation(也就是Taskhost)

vallocs
=getPreferredLocs(stage.rdd,p)

tasks+=
newShuffleMapTask(stage.id,stage.rdd,stage.shuffleDep.get,p,
locs)

}

}else{

否则表示是一个finalStage,此类stage直接输出结果,生成ResultTask,并加入到tasks列表中。

//This is a final stage; figure out its job's missing partitions

valjob
=resultStageToJob(stage)

for(id
<- 0until
job.numPartitionsif!job.finished(id)){

valpartition
=job.partitions(id)

得到rddstage中当前传入的partition的TaskLocation(也就是Taskhost)

vallocs
=getPreferredLocs(stage.rdd,partition)

tasks+=
newResultTask(stage.id,stage.rdd,job.func,partition,locs,
id)

}

}

valproperties=
if(idToActiveJob.contains(jobId)){

idToActiveJob(stage.jobId).properties

}else{

//thisstage will be assigned to "default" pool

null

}

//must be run listener before possible NotSerializableException

//should be "StageSubmitted" first and then "JobEnded"

listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage),properties))

假设有生成的tasks,也就是此job中有须要运行的task,

if(tasks.size>
) {

//Preemptively serialize a task to make sure it can be serialized. Weare catching this

//exception here because it would be fairly hard to catch thenon-serializableexception

//down the road, where we have several different implementations forlocal scheduler and

//cluster schedulers.

try{

SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)

}
catch{

casee:NotSerializableException
=>

abortStage(stage,
"Tasknot serializable: " +
e.toString)

running-= stage

return

}

logInfo("Submitting" +
tasks.size+
" missing tasks from "+ stage +
" ("+ stage.rdd+
")")

myPending++=
tasks

logDebug("Newpending tasks: " +
myPending)

运行TaskScheduler.submitTasks处理函数,TaskScheduler的实如今onyarn中为YarnClusterScheduler.

请參见以下的TaskScheduler提交task流程分析

taskSched.submitTasks(

newTaskSet(tasks.toArray,stage.id,stage.newAttemptId(),
stage.jobId,properties))

stageToInfos(stage).submissionTime=
Some(System.currentTimeMillis())

}else{

logDebug("Stage" + stage +
"is actually done; %b %d %d".format(

stage.isAvailable,stage.numAvailableOutputs,stage.numPartitions))

running-= stage

}

}

到眼下为此,job在DAGScheduler的处理流程完毕。等待TaskScheduler处理完数据后,回调DAGScheduler.

TaskScheduler提交task流程分析

TaskScheduler在onyarn模式时,实现为YarnClusterScheduler。

提交task时。通过调用submitTasks函数。

YarnClusterScheduler继承与TaskSchedulerImpl.

通过TaskSchedulerImpl.submitTasks对task的提交进行处理。

overridedef submitTasks(taskSet:
TaskSet){

valtasks
=taskSet.tasks

logInfo("Addingtask set " + taskSet.id+
" with "+
tasks.length+
" tasks")

this.synchronized{

生成一个TaskSetManager实例,并把此实例设置到activeTaskSets的容器中。

在生成实例的过程中。会把taskSet传入,并得到要运行的task个数。

并依据task的location信息,

,表示没有副本运行

把task分别放到pendingTasksForExecutor(process_local)此时没有值。

/pendingTasksForHost(node_local),此时此节点的task全在此里面,host在worker注冊时已经存在

/pendingTasksForRack(rack)/。通常情况不会有值

pendingTasksWithNoPrefs(待分配),通常情况不会有值。

allPendingTasks(any)

全部的task都在最后一个中。

valmanager
=newTaskSetManager(this,taskSet,
maxTaskFailures)

activeTaskSets(taskSet.id)=
manager

把TaskSetManager加入到rootPool中。

schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)

针对此TaskSet(job)生成一个跟踪每个task的容器

taskSetTaskIds(taskSet.id)=
newHashSet[Long]()

定时检查taskSet是否被启动,假设没有被启动,提示无资源,假设被启动成功,关闭此检查线程。

if(!isLocal && !hasReceivedTask){

starvationTimer.scheduleAtFixedRate(newTimerTask()
{

overridedefrun()
{

if(!hasLaunchedTask){

logWarning("Initialjob has not accepted any resources; "+

"checkyour cluster UI to ensure that workers are registered "+

"andhave sufficient memory")

}
else{

this.cancel()

}

}

},
STARVATION_TIMEOUT,STARVATION_TIMEOUT)

}

hasReceivedTask=
true

}

通过backend发起运行消息,backend是SchedulerBackend的详细实现,

在yarn-cluster模式为CoarseGrainedSchedulerBackend。

backend.reviveOffers()

}

CoarseGrainedSchedulerBackend.reviveOffers

通过driverActor的actor实例发起一个ReviveOffers的事件处理消息。

overridedef reviveOffers()
{

driverActor! ReviveOffers

}

driverActor的实现为CoarseGrainedSchedulerBackend.DriverActor实例。

DriverActor中处理revive的函数为receive.当中,处理ReviveOffers部分定义例如以下:

caseReviveOffers =>

makeOffers()

终于调用的makeOffers函数。

defmakeOffers() {

executorHost与freeCores的值由来请查看appmaster启动时的补充

launchTasks(scheduler.resourceOffers(

executorHost.toArray.map{case(id,
host)=>
newWorkerOffer(id,host,freeCores(id))}))

}

通过CoarseGrainedSchedulerBackend相应的scheduler(TaskSchdulerImpl).resourceOffers得到tasks

defresourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] =synchronized
{

SparkEnv.set(sc.env)

//Mark each slave as alive and remember its
hostname

for(o
<-offers) {

把executor(worker)相应的host存储到相应的容器中,通过executorid拿host

executorIdToHost(o.executorId)=
o.host

得到当前全部注冊的worker的host,写入到相应的容器中,此容器表示node_local

if(!executorsByHost.contains(o.host)){

executorsByHost(o.host)=
newHashSet[String]()

通过DAGScheduler.executorGained把executorId与host进行处理,

请參见以下DAGScheduler中处理ExecutorGained处理。

executorGained(o.executorId,o.host)

}

}

依据全部的worker,依据每个worker的的cpucore,生成[arraybuffer[]]

//Build a list of tasks to assign to each worker

valtasks
=offers.map(o => newArrayBuffer[TaskDescription](o.cores))

得到每个worker可用的cpu

valavailableCpus=
offers.map(o => o.cores).toArray

得到rootPool中排序后的队列中的全部的TaskSet存储的TaskSetMansger数组

valsortedTaskSets=
rootPool.getSortedTaskSetQueue()

for(taskSet
<-sortedTaskSets){

logDebug("parentName:%s, name: %s, runningTasks: %s".format(

taskSet.parent.name,taskSet.name,taskSet.runningTasks))

}

//Take each TaskSet in our scheduling order, and then offer it eachnode in increasing order

//of locality levels so that it gets a chance to launch local tasks onall of them.

varlaunchedTask=
false

迭代出每个TaskSetMansger,同一时候依据每个TaskSetMansger,

迭代去按网络的优先级运行PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL,ANY

scala中的for假设包括多个运行器。也就是<-的表达式,多个用;号分开。后面一个优先前面一个运行

也就是后一个运行完毕后。相当于一个嵌套的for

此处開始运行对taskSet的运行节点选择。针对每个taskset,首先使用PROCESS_LOCAL開始。

for(taskSet
<-sortedTaskSets;maxLocality<- TaskLocality.values) {

do{

迭代全部的worker,并在每迭代出一个worker时,在此机器上生成运行taskSet中相应的相关task

针对TaskSetmanager.resourceOffer的处理流程。见后面的细节分析,如今不分析此实现。

launchedTask=
false

for(i
<- 0until offers.size) {

valexecId
=offers(i).executorId

valhost
=offers(i).host

生成task运行的节点信息等,每次运行resourceOffer生成一个TaskDescription

把task相应的executorid与host加入到相应的activeExecutorIds与executorsByHost。

for(task
<-taskSet.resourceOffer(execId,host,availableCpus(i),maxLocality)){

tasks(i)+=
task

valtid
=task.taskId

taskIdToTaskSetId(tid)=
taskSet.taskSet.id

taskSetTaskIds(taskSet.taskSet.id)+=
tid

taskIdToExecutorId(tid)=
execId

此时把activeExecutorIds的值加入一个正在运行的executor,这个值的作用是当有多个stage的依赖时。

下一个stage在运行submitTasks时,

生成的TaskSetManager中会把新stage相应的task的executor直接使用此executor,也就是PROCESS_LOCAL.

activeExecutorIds+=
execId

executorsByHost(host)+=
execId

availableCpus(i) -=

launchedTask=
true

}

}

通TaskLocality。假设在一个较小的locality时找到一个task,从这个locality中接着找,

否则跳出去从下一个locality又一次找,放大locality的查找条件。

假设launchedTask的值为true,表示在传入的locality级别上查找到task要运行相应的级别,

那么在当前级别下接着去找到下一个可运行的TASK,否则launchedTask的值为false,放大一个locality的级别。

如launchedTask的值为false,当前迭代的locality的级别为PROCESS_LOCAL,那么把级别放大到NODE_LOCAL又一次查找.

}
while(launchedTask)

}

假设tasks生成成功,设置hasLaunchedTask的值为true,前面我们提到过的submitTasks中的检查线程開始结束。

if(tasks.size>
) {

hasLaunchedTask=
true

}

返回生成成功的task列表。交给CoarseGrainedSchedulerBackend.launchTasks处理

returntasks

}

CoarseGrainedSchedulerBackend.launchTasks处理流程:

通过worker注冊的actor,向CoarseGrainedExecutorBackend发送消息。处理LaunchTask事件

deflaunchTasks(tasks: Seq[Seq[TaskDescription]]) {

for(task
<-tasks.flatten) {

freeCores(task.executorId) -=

executorActor(task.executorId)!
LaunchTask(task)

}

}

CoarseGrainedExecutorBackend中处理LaunchTask事件事件。

overridedef receive = {

caseLaunchTask(taskDesc) =>

logInfo("Gotassigned task " +
taskDesc.taskId)

if(executor==
null){

logError("ReceivedLaunchTask command but executor was null")

)

}
else{

通过executor运行task,见后面的分析。

executor.launchTask(this,taskDesc.taskId,taskDesc.serializedTask)

}

TaskSetManager.resourceOffer函数,每次运行得到一个task的运行节点。

defresourceOffer(

execId: String,

host: String,

availableCpus: Int,

maxLocality:TaskLocality.TaskLocality)

:Option[TaskDescription] =

{

假设成功的task个数小于当前的job要运行的task的个数,

同一时候worker中可用的cpu资源须要大于或等于spark.task.cpus配置的值,默认须要大于或等于1.

if(tasksSuccessful<
numTasks&& availableCpus >=
CPUS_PER_TASK){

valcurTime
=clock.getTime()

得到一个默认的locality的值,默认情况下最有可能是NODE_LOCAL.

此处依据上一次查找可运行节点的时间。得到一个合适此运行时间的一个locality级别。

通过spark.locality.wait配置全局的等待时间。默觉得3000ms。作用于PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL

通过spark.locality.wait.process配置PROCESS_LOCAL的等待时间。

通过spark.locality.wait.node配置NODE_LOCAL的等待时间。

通过spark.locality.wait.rack配置RACK_LOCAL的等待时间。

这里的查找方式是通过当前的開始。找到相应可运行的级别。

检查当前时间减去上次的查找级别的运行时间是否大于上面配置的在此级别的运行时间。

假设大于配置的时间,把currentLocalityIndex的值+1又一次检查,返回一个合适的locality级别。

假设运行查找的时间超过了以上配置的几个locality的级别的查找时间,此时返回的值为ANY.

varallowedLocality=
getAllowedLocalityLevel(curTime)

首先把当前可运行的locality设置为PROCESS_LOCAL.maxLocality是最大的级别,

得到的可运行级别不能超过此级别,从PROCESS_LOCAL開始一级一级向上加大。

maxLocality的级别从PROCESS_LOCAL一级一级向上加。

假设getAllowedLocalityLevel查找到的级别大于如今传入的级别。

把级别设置为传入的级别。

maxLocality传入按PROCESS_LOCAL/NODE_LOCAL/RACK_LOCAL/ANY进行传入。

if(allowedLocality>
maxLocality) {

allowedLocality= maxLocality
// We're not allowed tosearch for farther-away tasks

}

通过findTask来得到task相应的运行网络选择。

见以下的TaskSetManager.findTask选择task的运行节点的流程部分

findTask(execId, host,allowedLocality)match{

caseSome((index,taskLocality))=>
{

//Found a task; do some bookkeeping and return a task description

valtask
=tasks(index)

valtaskId
=sched.newTaskId()

//Figure out whether this should count as a preferred launch

logInfo("Startingtask %s:%d as TID %s on executor %s: %s (%s)".format(

taskSet.id,index,taskId,execId,
host, taskLocality))

设置task的运行副本加一,

//Do various bookkeeping

copiesRunning(index) +=

valinfo
= newTaskInfo(taskId,index,curTime,execId,
host, taskLocality)

taskInfos(taskId)=
info

taskAttempts(index)=
info ::taskAttempts(index)

得到当前载入的节点运行级别的index,并更新当前查找此运行节点的查找时间为当前时间。

//Update our locality level for delay scheduling

currentLocalityIndex= getLocalityIndex(taskLocality)

lastLaunchTime=
curTime

//Serialize and return the task

valstartTime
=clock.getTime()

//We rely on the DAGScheduler to catch non-serializableclosures
and RDDs, so in here

//we assume the task can be serialized without exceptions.

valserializedTask=
Task.serializeWithDependencies(

task,sched.sc.addedFiles,sched.sc.addedJars,ser)

valtimeTaken
=clock.getTime() - startTime

把task加入到runningTasksSet的容器中。

addRunningTask(taskId)

logInfo("Serializedtask %s:%d as %d bytes in %d ms".format(

taskSet.id,index,serializedTask.limit,timeTaken))

valtaskName
="task %s:%d".format(taskSet.id,index)

,表示是第一次尝试运行,通过DAGScheduler触发BeginEvent事件。

if(taskAttempts(index).size==
)

taskStarted(task,info)

returnSome(newTaskDescription(taskId,execId,
taskName,index,serializedTask))

}

case_ =>

}

}

None

}

TaskSetManager.findTask选择task的运行节点的流程部分:

从不同的locality级别中取出须要运行的task.

privatedef findTask(execId:
String,host: String, locality: TaskLocality.Value)

:Option[(Int, TaskLocality.Value)] =

{

此处优先找PROCESS_LOCAL的值。可是我如今还没有搞明确这个pendingTasksForExecutor的值从何来。

从TaskSetManager生成时能够看出pendingTasksForExecutor的值在实例生成时,

从TaskSchedulerImpl.activeExecutorIds中检查并生成。但实例生成此,此容器还没有值。这点还没搞明确。

新的批注:

当有stage的依赖关系时,第一个stage运行完毕后。activeExecutorIds的容器会有运行过的executor列表。

对上一个stage运行完毕后,新的一个stage開始运行,

生成的TaskSetManager中pendingTasksForExecutor中包括能够直接使用上一个stage中部分task运行的executor的task.

因此,假设有stage的依赖关系时,下一个stage中的task在此时假设executorid同样。直接使用PROCESS_LOCAL来运行。

第一个stage运行时。PROCESS_LOCAL不会被选择,正常情况locality的选择会放大的NODE_LOCAL開始。

for(index
<-findTaskFromList(getPendingTasksForExecutor(execId))) {

returnSome((index,TaskLocality.PROCESS_LOCAL))

}

if(TaskLocality.isAllowed(locality,
TaskLocality.NODE_LOCAL)){

for(index
<-findTaskFromList(getPendingTasksForHost(host))) {

returnSome((index,TaskLocality.NODE_LOCAL))

}

}

if(TaskLocality.isAllowed(locality,
TaskLocality.RACK_LOCAL)){

for{

rack<- sched.getRackForHost(host)

index<- findTaskFromList(getPendingTasksForRack(rack))

} {

returnSome((index,TaskLocality.RACK_LOCAL))

}

}

//Look for no-preftasks
after rack-local tasks since they can run anywhere.

for(index
<-findTaskFromList(pendingTasksWithNoPrefs)){

returnSome((index,TaskLocality.PROCESS_LOCAL))

}

if(TaskLocality.isAllowed(locality,
TaskLocality.ANY)){

for(index
<-findTaskFromList(allPendingTasks)){

returnSome((index,TaskLocality.ANY))

}

}

//Finally, if all else has failed, find a speculative task

findSpeculativeTask(execId, host,locality)

}

appmaster启动时的补充

一些须要的说明:在makeOffers中调用了TaskScheduler.resourceOffers函数。

此函数中传入的executorHost,freeCores的值什么时候得到呢:

我们知道在appmaster启动的时候。会依据设置的num-worker个数。向rm申请worker执行的资源。

并通过WorkerRunnable启动worker相应的container。启动CoarseGrainedExecutorBackend实例在container中.

在实例中连接appmaster相应的sparkContext中的scheduler中的CoarseGrainedSchedulerBackend.DriverActor

此DriverActor的name为CoarseGrainedSchedulerBackend.ACTOR_NAME.

例如以下是CoarseGrainedExecutorBackend生成的一些代码片段:

YarnAllocationHandler.allocateResources中得到actor的名称。

valworkerId =workerIdCounter.incrementAndGet().toString

valdriverUrl
="akka.tcp://spark@%s:%s/user/%s".format(

sparkConf.get("spark.driver.host"),

sparkConf.get("spark.driver.port"),

CoarseGrainedSchedulerBackend.ACTOR_NAME)

YarnAllocationHandler.allocateResources通过WorkerRunnable的线程启动worker的container

valworkerRunnable=
newWorkerRunnable(

container,

conf,

sparkConf,

driverUrl,

workerId,

workerHostname,

workerMemory,

workerCores)

newThread(workerRunnable).start()

在CoarseGrainedExecutorBackend实例启动时,向actor注冊。

overridedefpreStart()
{

logInfo("Connectingto driver: " + driverUrl)

driver=
context.actorSelection(driverUrl)

发起worker启动时注冊Executor的消息。

driver!
RegisterExecutor(executorId,hostPort, cores)

context.system.eventStream.subscribe(self,classOf[RemotingLifecycleEvent])

}

CoarseGrainedSchedulerBackend中发起RegisterExecutor的事件处理。

defreceive = {

caseRegisterExecutor(executorId,hostPort,cores)
=>

Utils.checkHostPort(hostPort,"Host
port expected "+ hostPort)

假设此executorActor中已经包括有发送此消息过来的actor,表示此worker已经注冊。

通过发送消息过来的actor(sender表示发送此消息的actor)发送一个RegisterExecutorFailed事件。

if(executorActor.contains(executorId)){

sender! RegisterExecutorFailed("Duplicateexecutor
ID: " + executorId)

}
else{

否则表示actor(worker)还没有被注冊,把actor加入到executorActor中,

同一时候向发送消息过来的actor(sender表示发送此消息的actor)发送一个RegisteredExecutor消息.

logInfo("Registeredexecutor: " + sender +
"with ID " +
executorId)

sender !RegisteredExecutor(sparkProperties)

executorActor(executorId)=
sender

加入在TaskScheduler中提交task时使用的executorHost,与freeCores

executorHost(executorId)=
Utils.parseHostPort(hostPort)._1

freeCores(executorId)=
cores

executorAddress(executorId)=
sender.path.address

addressToExecutorId(sender.path.address)=
executorId

totalCoreCount.addAndGet(cores)

把如今注冊的全部的节点加入到TaskScheduler.executorsByHost中。在生成TaskSetManager是会使用

makeOffers()

}

CoarseGrainedExecutorBackend中接收appmaster中scheduler的receive.

针对RegisteredExecutor与RegisterExecutorFailed的处理流程:

receive函数中处理RegisterExecutorFailed:假设已经存在,直接exit掉此jvm.

caseRegisterExecutorFailed(message)=>

logError("Slaveregistration failed: " +
message)

)

receive函数中处理RegisteredExecutor:假设不存在。生成Executor实例。此时worker启动完毕。

并向master注冊成功。

caseRegisteredExecutor(sparkProperties) =>

logInfo("Successfullyregistered with driver")

//Make this host instead of hostPort ?

executor=
newExecutor(executorId, Utils.parseHostPort(hostPort)._1,sparkProperties)

RDD的依赖关系

..........

Spark中的Scheduler

........

Task的运行过程