spark 源码分析之四 -- TaskScheduler的创建和启动过程

时间:2023-03-10 03:07:08
spark 源码分析之四 -- TaskScheduler的创建和启动过程

在 spark 源码分析之二 -- SparkContext 的初始化过程 中,第 14 步 和 16 步分别描述了 TaskScheduler的 初始化 和 启动过程。

话分两头,先说 TaskScheduler的初始化过程

TaskScheduler的实例化

 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

其调用了org.apache.spark.SparkContext#createTaskScheduler , 源码如下:

 /**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1 master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler) case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler) case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
} val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler) case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}

不同的实现如下:

  spark 源码分析之四 -- TaskScheduler的创建和启动过程

实例化部分剖析完毕,下半部分重点剖析yarn-client mode 下 TaskScheduler 的启动过程

yarn-client模式TaskScheduler 启动过程

初始化调度池

yarn-client 模式下,TaskScheduler的实现是 org.apache.spark.scheduler.cluster.YarnScheduler, TaskSchedulerBackend的实现是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend

在org.apache.spark.SparkContext#createTaskScheduler 方法中,有如下调用:

 case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}

其中的,cm.initialize(scheduler, backend)中的cm 是org.apache.spark.scheduler.cluster.YarnClusterManager,TaskScheduler的实现是 org.apache.spark.scheduler.cluster.YarnScheduler, TaskSchedulerBackend的实现是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend。YarnClusterManager 的 initialize 方法实现如下:

   override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}

其并没有实现 initialize, 父类TaskSchedulerImpl 的实现如下:

 def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}

可以看出,其重要作用就是设置 TaskScheduler 的 TaskSchedulerBackend 引用。

调度模式主要有FIFO和FAIR两种模式。默认是FIFO模式,可以使用spark.scheduler.mode 参数来设定。使用建造者模式来创建 Pool 对象。

其中,org.apache.spark.scheduler.FIFOSchedulableBuilder#buildPools是一个空实现,即没有做任何的操作;而 org.apache.spark.scheduler.FairSchedulableBuilder#buildPools会加载 相应调度分配策略文件;策略文件可以使用 spark.scheduler.allocation.file 参数来设定,如果没有设定会进一步加载默认的 fairscheduler.xml 文件,如果还没有,则不加载。如果有调度池的配置,则根据配置配置调度pool并将其加入到 root 池中。最后初始化 default 池并将其加入到 root 池中。

在HeartBeatReceiver 中设定 taskscheduler 变量

 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

首先,_heartbeatReceiver 是一个 RpcEndPointRef 对象,其请求最终会被 HeartbeatReceiver(Endpoint)接收并处理。即org.apache.spark.HeartbeatReceiver#receiveAndReply方法:

  

 case TaskSchedulerIsSet =>
scheduler = sc.taskScheduler
context.reply(true)

具体的关于RPC的相关解释,会在后面有专门的文章篇幅介绍。在这里就不做过多解释。 // TODO

启动TaskScheduler

org.apache.spark.SparkContext 的初始化方法有如下代码启动 TaskScheduler:

 _taskScheduler.start()

yarn-client模式下,运行中调用了 org.apache.spark.scheduler.cluster.YarnScheduler 的 start 方法,它沿用了父类 TaskSchedulerImpl 的实现:

 override def start() {
// 1. 启动 task scheduler backend
backend.start()
// 2. 设定 speculationScheduler 定时任务
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}

第1步:task scheduler backend 的启动:org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend#start的方法如下:

 /**
* Create a Yarn client to submit an application to the ResourceManager.
* This waits until the application is running.
*/
override def start() {
// 1. 获取driver 的 host 和 port
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
// 2. 设定 driver 的 web UI 地址
sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport) logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray)
totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
// 3. 启动 deploy client,并切初始化 driverClient 的 Rpc environment,并在该RPC 环境中初始化master 和 driver 的rpc endpoint
client = new Client(args, conf)
// 4. 将 application id 绑定到 yarn 上
bindToYarn(client.submitApplication(), None) // SPARK-8687: Ensure all necessary properties have already been set before
// we initialize our driver scheduler backend, which serves these properties
// to the executors
super.start()
// 5. 检查 yarn application的状态,不能为 kill, finished等等
waitForApplication()
// 6. 监控线程
monitorThread = asyncMonitorApplication()
monitorThread.start()
}

重点解释一下第三步,涉及的源码步如下:

 object Client {
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
new ClientApp().start(args, new SparkConf())
}
} private[spark] class ClientApp extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args) if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel) val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf)) rpcEnv.awaitTermination()
} }

可以看到,在Client 的main方法中,初始化了ClientApp 对象,并调用了其 start 方法,在start 方法中, 首先解析了 driver的 参数。然后创建了 driver 端的 RPC environment,然后 根据解析的 master 的信息,初始化 master 的endpointref,并且建立了 client endpoint 并返回 client endpoint ref。

定时执行推测任务

下面继续看 org.apache.spark.scheduler.cluster.YarnScheduler 的 start 方法 的 第二步方法,首先 spark 推测任务 feature 默认是关闭的,原因如果有很多任务都延迟了,那么它会再启动一个相同的任务,这样可能会消耗掉所有的资源,对集群资源和提交到集群上的任务造成不可控的影响。启动了一个延迟定时器,定时地执行 checkSpeculatableTasks 方法,如下:

 // Check for speculatable tasks in all our active jobs.
def checkSpeculatableTasks() {
var shouldRevive = false
synchronized {
shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION) // 1. 推测是否应该跑一个新任务
}
if (shouldRevive) {
backend.reviveOffers() // 2. 跑一个新任务
}
}

其中,第一步推断任务,有两个实现一个是Pool 的实现,一个是TaskSetManager 的实现,Pool 会递归调用子Pool来获取 speculatable tasks。如果需要推测,则运行task scheduler backend 的 reviveOffers方法,大致思路如下,首先获取 executor 上的空闲资源,然后将这些资源分配给 推测的 task,供其使用。

总结,本篇源码剖析了在Spark Context 启动过程中, 以 yarn-client 模式为例,剖析了task scheduler 是如何启动的。

其中关于RpcEnv的介绍直接略过了,下一篇会专门讲解Spark 中内置的Rpc 机制的整体架构以及其是如何运行的。