Spark1.3从创建到提交:3)任务调度初始化源码分析

时间:2022-12-17 14:33:28

TaskSchedulerImpl & SparkDeploySchedulerBackend

上一节在SparkContext中也提及到了,在该类中创建了一个任务调度器,下面我们具体来分析这个方法

 private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)

createTaskScheduler的代码如下:

private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
//master参数正则字符串的初始化
val SPARK_REGEX = """spark://(.*)""".r
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)

case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
}
}
}
通过match匹配出目前master使用的是什么模式,比如有local、Standalone、Yarn等等,目前我们以Standalone为例进行剖析,也就是case SPARK_REGEX(sparkUrl),在该作用域中,创建了一个TaskSchedulerImpl,它是TaskSchedulerI的一个子类, TaskSchedulerImpl的初始化核心代码如下 

private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
{

//某种部署模式的backend
var backend: SchedulerBackend = null

def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
//....
}

任务调度有2个模式,默认的是先进先出模式。再回到  case SPARK_REGEX(sparkUrl)中接下来创建了SparkDeploySchedulerBackend,这个就是Standalone模式对应的backend,SparkDeploySchedulerBackend的代码如下:

private[spark] class SparkDeploySchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with AppClientListener
with Logging {

var client: AppClient = null
var stopping = false
//...
override def start() {
super.start()
//..
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
}
}

可以看下AppClient的start方法,里面创建了一个ClientActor,其负责和Master节点进行通信(下面会具体介绍)

def start() {
// Just launch an actor; it will call back into the listener.
actor = actorSystem.actorOf(Props(new ClientActor))
}

在上节SparkContext中提及到了最后会执行 taskScheduler.start(),该taskScheduler就是TaskSchedulerImpl,TaskSchedulerImpl的start方法又调用了backend.start()

  override def start() {
backend.start()
//...
}

该backend就是SparkDeploySchedulerBackend的实例,SparkDeploySchedulerBackend.start方法首先调用了父类CoarseGrainedSchedulerBackend(该类是集群部署模式的父类)的start方法:

 override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}

里面主要创建了一个DriverActor,其负责和Executor进程通信(目前没有用到,其会监听executor的消息请求)


ClientActor

类ClientActor作为AppClient的一个内部类存在。如上面提及的,该actor在SparkDeploySchedulerBackend的start方法中被创建,具体在client.start()中

val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
在创建ClientActor之前会收集提交任务的一些参数,比如java的参数、待实例化的Executor类路径、appName,请求的核数和Executor内存大小。把这些参数封装成appDesc 并传递给了AppClient。


下面具体看下类ClientActor的生命周期,在preStart方法中,clientActor会向Master节点进行注册,最终调用了tryRegisterAllMasters方法

def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterApplication(appDescription)
}
}
可以看到,clientActor把appDescription描述信息发生给了master,在类Master的receive方法中,我们看下RegisterApplication

   case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
}
MasterActor接收到ClientAcotor发送过来的注册App消息,把注册信息保存到内存并持久化到了文件,最后给Client发生一个注册成功的信息。接下来,Master会根据App的请求参数,查询符合条件的Worker并在该节点上面启动对应的Executor进程来执行任务。