spark 1.6.0 core源码分析3 Master HA

时间:2022-10-23 16:23:48

在Master启动过程中,首先调用了 netty on Start方法。

  override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort //这里会启一个定时调度,检查timeout的worker进程。如果有worker超时,则将状态置为DEAD,并清理一些内存中关于该worker的信息。如果该worker中有Executor进程,则向driver发送ExecutorUpdated消息,表明该Executor也已经不可用了。如果该worker中有Driver进程,且配置driver是可以relaunch的,则重新调度在可用的worker节点上启动,不然的话就删除该Driver的内存信息。只有在该worker超时很多次之后,才真正删除,之前其实只是让该worker不被选中执行任务而已。
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
restServerBoundPort = restServer.map(_.start())

masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
// started.
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

val serializer = new JavaSerializer(conf) //master ha 过程
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}


上面的persistenceEngine_封装了在zk中读写元数据信息,以及序列化反序列化的接口

leaderElectionAgent_封装了master的选举过程,见下面代码注释中的解释

[java] view plain copy 
  
  
  1. private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,  
  2.     conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {  
  3.   
  4.   //依赖zk中的一个节点来判断选主  
  5.   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir""/spark") + "/leader_election"  
  6.   
  7.   private var zk: CuratorFramework = _  
  8.   private var leaderLatch: LeaderLatch = _  
  9.   private var status = LeadershipStatus.NOT_LEADER  
  10.   
  11.   //构造这个对象之后就调用了start方法  
  12.   start()  
  13.   //leaderLatch.start()一旦调用,LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader  
  14.   private def start() {  
  15.     logInfo("Starting ZooKeeper LeaderElection agent")  
  16.     zk = SparkCuratorUtil.newClient(conf)  
  17.     leaderLatch = new LeaderLatch(zk, WORKING_DIR)  
  18.     leaderLatch.addListener(this)  
  19.     leaderLatch.start()  
  20.   }  
  21.   
  22.   override def stop() {  
  23.     leaderLatch.close()  
  24.     zk.close()  
  25.   }  
  26.   
  27.   //当一个master被选为主时,isLeader方法被回调,说明在这一轮选举中胜出  
  28.   override def isLeader() {  
  29.     synchronized {  
  30.       // could have lost leadership by now.  
  31.       if (!leaderLatch.hasLeadership) {  
  32.         return  
  33.       }  
  34.   
  35.       logInfo("We have gained leadership")  
  36.       updateLeadershipStatus(true)  
  37.     }  
  38.   }  
  39.   
  40.   //当一个master被选为备时,notLeader方法被回调,说明在这一轮选举中落败  
  41.   override def notLeader() {  
  42.     synchronized {  
  43.       // could have gained leadership by now.  
  44.       if (leaderLatch.hasLeadership) {  
  45.         return  
  46.       }  
  47.   
  48.       logInfo("We have lost leadership")  
  49.       updateLeadershipStatus(false)  
  50.     }  
  51.   }  
  52.   private def updateLeadershipStatus(isLeader: Boolean) {  
  53.     //当一个master之前状态为备,目前被选为主  
  54.     if (isLeader && status == LeadershipStatus.NOT_LEADER) {  
  55.       status = LeadershipStatus.LEADER  
  56.       masterActor.electedLeader()//调用master类的electedLeader方法      
  57.       //当一个master之前状态为主,目前被选为备  
  58.     } else if (!isLeader && status == LeadershipStatus.LEADER) {  
  59.       status = LeadershipStatus.NOT_LEADER  
  60.       masterActor.revokedLeadership()//调用master类的revokedLeadership方法      
  61.     }  
  62.   }  
  63.   
  64.   private object LeadershipStatus extends Enumeration {  
  65.     type LeadershipStatus = Value  
  66.     val LEADER, NOT_LEADER = Value  
  67.   }  
  68. }  
继续查看master中的逻辑
[java] view plain copy 
   
   
  1. override def receiveWithLogging: PartialFunction[Any, Unit] = {  
  2.   case ElectedLeader => {  
  3.     //既然之前是备,现在想变成主,就需要读取zk中的必要的信息来构造元数据  
  4.     val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()  
  5.     state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {  
  6.       RecoveryState.ALIVE//如果没有任何元数据需要构造,则直接置为alive状态  
  7.     } else {  
  8.       RecoveryState.RECOVERING//不然需要置为恢复中  
  9.     }  
  10.     logInfo("I have been elected leader! New state: " + state)  
  11.     if (state == RecoveryState.RECOVERING) {  
  12.       beginRecovery(storedApps, storedDrivers, storedWorkers)//见下面介绍  
  13.       recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,  
  14.         CompleteRecovery)  
  15.     }  
  16.   }  
  17.   
  18.   case CompleteRecovery => completeRecovery()  
  19.   
  20.   //之前是主,现在被置为备了,不需要额外操作,退出即可  
  21.   case RevokedLeadership => {  
  22.     logError("Leadership has been revoked -- master shutting down.")  
  23.     System.exit(0)  
  24.   }  
开始恢复
[java] view plain copy 
   
   
  1. private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],  
  2.     storedWorkers: Seq[WorkerInfo]) {  
  3.   for (app <- storedApps) {  
  4.     logInfo("Trying to recover app: " + app.id)  
  5.     try {  
  6.       registerApplication(app)//将读到的app加载到内存  
  7.       app.state = ApplicationState.UNKNOWN//状态置为unknown  
  8.       app.driver.send(MasterChanged(self, masterWebUiUrl))//向driver发送MasterChanged消息  
  9.     } catch {  
  10.       case e: Exception => logInfo("App " + app.id + " had exception on reconnect")  
  11.     }  
  12.   }  
  13.   
  14.   for (driver <- storedDrivers) {  
  15.     // Here we just read in the list of drivers. Any drivers associated with now-lost workers  
  16.     // will be re-launched when we detect that the worker is missing.  
  17.     drivers += driver//将读到的driver加载到内存  
  18.   }  
  19.   
  20.   for (worker <- storedWorkers) {  
  21.     logInfo("Trying to recover worker: " + worker.id)  
  22.     try {  
  23.       registerWorker(worker)//将读到的worker信息加载到内存  
  24.       worker.state = WorkerState.UNKNOWN//同样状态需要置为unknown,需要等到worker发送消息过来之后才能认为该worker是可用的  
  25.       wworker.endpoint.send(MasterChanged(self, masterWebUiUrl))//向worker发送MasterChanged消息  
  26.     } catch {  
  27.       case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")  
  28.     }  
  29.   }  
  30. }  
看driver端收到MasterChanged消息会发生什么?在AppClient.scala中
只有主master会发送MasterChanged消息,所以这里的masterUrl肯定是新的主master的
[java] view plain copy 
    
    
  1. case MasterChanged(masterUrl, masterWebUiUrl) =>  
  2.   logInfo("Master has changed, new master is at " + masterUrl)  
  3.   //收到这个消息之后,driver需要修改之前保存的master信息,用于之后向新的master通信  
  4.   changeMaster(masterUrl)  
  5.   alreadyDisconnected = false  
  6.   sender ! MasterChangeAcknowledged(appId)//向master反馈MasterChangeAcknowledged消息  
master这时会收到所有app中driver发来的消息,我们看master收到MasterChangeAcknowledged消息的处理方式,参数为appId
[java] view plain copy 
     
     
  1. case MasterChangeAcknowledged(appId) => {  
  2.   idToApp.get(appId) match {  
  3.     case Some(app) =>  
  4.       logInfo("Application has been re-registered: " + appId)  
  5.       app.state = ApplicationState.WAITING  //收到消息后将app状态置为WAITING  
  6.     case None =>  
  7.       logWarning("Master change ack from unknown app: " + appId)  
  8.   }  
  9.   
  10.   if (canCompleteRecovery) { completeRecovery() }  //这个只是优先判断消息处理是否都结束了,这样就不用等待worker_timeout的时间间隔再调用completeRecovery了  
  11. }  
看worker端收到MasterChanged消息会发生什么?在Worker.scala中
[java] view plain copy 
       
       
  1. case MasterChanged(masterUrl, masterWebUiUrl) =>  
  2.   logInfo("Master has changed, new master is at " + masterUrl)  
  3.   changeMaster(masterUrl, masterWebUiUrl)//同上  
  4.   
  5. //master不与Executor交互,所以需要worker来告诉master关于Executor的信息  
  6. val execs = executors.values.  
  7.     map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))  
  8.   sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)  
继续看master中的处理逻辑
[java] view plain copy 
       
       
  1. case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {  
  2.   idToWorker.get(workerId) match {  
  3.     case Some(worker) =>  
  4.       logInfo("Worker has been re-registered: " + workerId)  
  5.       worker.state = WorkerState.ALIVE //这时可以将之前worker状态unknown修改为ALIVE,代表该worker可用  
  6.   
  7.       //将接受到的Executor信息更新到相关的app,worker中  
  8.       val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)  
  9.       for (exec <- validExecutors) {  
  10.         val app = idToApp.get(exec.appId).get  
  11.         val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))  
  12.         worker.addExecutor(execInfo)  
  13.         execInfo.copyState(exec)  
  14.       }  
  15.   
  16.       //将master中driver信息更新,状态置为RUNNING  
  17.       for (driverId <- driverIds) {  
  18.         drivers.find(_.id == driverId).foreach { driver =>  
  19.           driver.worker = Some(worker)  
  20.           driver.state = DriverState.RUNNING  
  21.           worker.drivers(driverId) = driver  
  22.         }  
  23.       }  
  24.     case None =>  
  25.       logWarning("Scheduler state from unknown worker: " + workerId)  
  26.   }  
  27.   
  28.   if (canCompleteRecovery) { completeRecovery() }  //同上  
  29. }  
这一切都处理完毕之后,看master的completeRecovery,这个是在beginRecovery调用之后,在延迟worker_timeout时间之后调用,一般情况下,上面的消息来回发送处理应该都已经结束了
[java] view plain copy 
       
       
  1. private def completeRecovery() {  
  2.   // Ensure "only-once" recovery semantics using a short synchronization period.  
  3.   synchronized {  
  4.     if (state != RecoveryState.RECOVERING) { return }  
  5.     state = RecoveryState.COMPLETING_RECOVERY//状态置为恢复完成  
  6.   }  
  7.   
  8.   // Kill off any workers and apps that didn't respond to us.  
  9.   //清理在这个worker_timeout间隔过后还未处理成功的worker和app  
  10.   workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)  
  11.   apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)  
  12.   
  13.   // Reschedule drivers which were not claimed by any workers  
  14.   //在一番消息通信之后,本应该在driver中更新的worker信息不见了,则重启driver或者删除  
  15.   drivers.filter(_.worker.isEmpty).foreach { d =>  
  16.     logWarning(s"Driver ${d.id} was not found after master recovery")  
  17.     if (d.desc.supervise) {  
  18.       logWarning(s"Re-launching ${d.id}")  
  19.       relaunchDriver(d)  
  20.     } else {  
  21.       removeDriver(d.id, DriverState.ERROR, None)  
  22.       logWarning(s"Did not re-launch ${d.id} because it was not supervised")  
  23.     }  
  24.   }  
  25.   
  26.   state = RecoveryState.ALIVE  //这时恢复状态真正结束了  
  27.   schedule() //整个选主流程结束时候,重新调度一次  
  28.   logInfo("Recovery complete - resuming operations!")  
  29. }