Spark学习之1:Master启动流程

时间:2024-03-09 07:32:17

1. 启动脚本

sbin/start-master.sh

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port     $SPARK_MASTER_WEBUI_PORT
参数:

(1)SPARK_MASTER_IP

(2)SPARK_MASTER_PORT

(3)SPARK_MASTER_WEBUI_PORT

Master类最终会通过bin/spark-class脚本启动。

其中的参数“1”用于表示master编号,在生成日志文件时起作用,并不会传入Master类。

  1. spark-xxx-org.apache.spark.deploy.master.Master-1-CentOS-01.out
  2. spark-xxx-org.apache.spark.deploy.master.Master-1.pid
其中“Master-1”中的“1”就是master编号。

2. Master.main

  1. 1 def main(argStrings: Array[String]) {
    2     SignalLogger.register(log)
    3     val conf = new SparkConf
    4     val args = new MasterArguments(argStrings, conf)
    5     val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
    6     actorSystem.awaitTermination()
    7   }
main函数的职责:

(1)创建MasterArguments对象并初始化其成员;

(2)调用startSystemAndActor方法,创建ActorSystem对象并启动Master actor

2.1. MasterArguments

  1.   parse(args.toList)
      // This mutates the SparkConf, so all accesses to it must be made after this line
      propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

(1)parse方法负责解析启动脚本所带的命令行参数;

(2)loadDefaultSparkProperties负责从配置文件中加载spark运行属性,默认而配置文件为spark-defaults.conf

2.2. startSystemAndActor

  1.     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
          securityManager = securityMgr)
        val actor = actorSystem.actorOf(
          Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

(1)通过AkkaUtils.createActorSystem创建ActorSystem对象

(2)创建Master actor并启动

3. Master Actor

3.1. 重要数据成员

  1.   val workers = new HashSet[WorkerInfo]
      val idToWorker = new HashMap[String, WorkerInfo]
      val addressToWorker = new HashMap[Address, WorkerInfo]
      val apps = new HashSet[ApplicationInfo]
      val idToApp = new HashMap[String, ApplicationInfo]
      val actorToApp = new HashMap[ActorRef, ApplicationInfo]
      val addressToApp = new HashMap[Address, ApplicationInfo]
      val waitingApps = new ArrayBuffer[ApplicationInfo]
      val completedApps = new ArrayBuffer[ApplicationInfo]
      var nextAppNumber = 0
      val appIdToUI = new HashMap[String, SparkUI]
      val drivers = new HashSet[DriverInfo]
      val completedDrivers = new ArrayBuffer[DriverInfo]
      val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling

3.2. Master.preStart

  1.     // Listen for remote client disconnection events, since they don\'t go through Akka\'s watch()
        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

监听RemotingLifecycleEvent事件,它一个trait

  1. sealed trait RemotingLifecycleEvent extends Serializable {
      def logLevel: Logging.LogLevel
    }

Master只处理了DisassociatedEvent消息。

  1.     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

启动定时器,检查Worker超时;以Work超时时间为周期,向Master发送CheckForWorkerTimeOut消息;默认超时时间为60秒,可通过spark.worker.timeout属性设置。

  1.     val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
          case "ZOOKEEPER" =>
            logInfo("Persisting recovery state to ZooKeeper")
            val zkFactory =
              new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
            (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
          case "FILESYSTEM" =>
            val fsFactory =
              new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
            (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
          case "CUSTOM" =>
            val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
            val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
              .newInstance(conf, SerializationExtension(context.system))
              .asInstanceOf[StandaloneRecoveryModeFactory]
            (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
          case _ =>
            (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
        }

根据RECOVERY_MODE创建持久化引擎和领导选择代理。RECOVERY_MODE默认值为NONE,通过spark.deploy.recoveryMode进行配置。

假设RECOVERY_MODE值为NONE

(1)创建BlackHolePersistenceEngine对象,不做任何持久化操作;

(2)创建MonarchyLeaderAgent对象,其主构造函数将向Master发送ElectedLeader消息

3.3. Master消息处理

3.3.1. ElectedLeader消息

  1.     case ElectedLeader => {
          val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
          state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
            RecoveryState.ALIVE
          } else {
            RecoveryState.RECOVERING
          }
          logInfo("I have been elected leader! New state: " + state)
          if (state == RecoveryState.RECOVERING) {
            beginRecovery(storedApps, storedDrivers, storedWorkers)
            recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
              CompleteRecovery)
          }
        }

前面假设RECOVERY_MODE值为NONE所以不执行任何recovery操作,直接将state设置为RecoveryState.ALIVE

3.3.2. CheckForWorkerTimeOut消息

  1.     case CheckForWorkerTimeOut => {
          timeOutDeadWorkers()
        }

检查超时Worker节点。Worker节点超时时间默认为60秒,通过spark.worker.timeout属性设置。

3.3.3. DisassociatedEvent消息

  1.     case DisassociatedEvent(_, address, _) => {
          // The disconnected client could\'ve been either a worker or an app; remove whichever it was
          logInfo(s"$address got disassociated, removing it.")
          addressToWorker.get(address).foreach(removeWorker)
          addressToApp.get(address).foreach(finishApplication)
          if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
        }

3.3.4. RegisterWorker消息

这是WorkerMaster之间的注册消息。

  1.         val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
              sender, workerUiPort, publicAddress)
            if (registerWorker(worker)) {
              persistenceEngine.addWorker(worker)
              sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
              schedule()
            }

(1)创建WorkerInfo对象;

(2)调用registerWorker方法,记录Worker信息;

(3)Worker发送RegisteredWorker消息;

(4)调用schedule方法,该方法的职责是为DriverApp分配资源。

3.3.4.1. WorkerInfo

  1. private[spark] class WorkerInfo(
        val id: String,
        val host: String,
        val port: Int,
        val cores: Int,
        val memory: Int,
        val actor: ActorRef,
        val webUiPort: Int,
        val publicAddress: String)
      extends Serializable {
      ...
      init()
      ...
      private def init() {
        executors = new mutable.HashMap
        drivers = new mutable.HashMap
        state = WorkerState.ALIVE
        coresUsed = 0
        memoryUsed = 0
        lastHeartbeat = System.currentTimeMillis()
      }

创建WorkerInfo对象,并调用init进行初始化。

3.3.4.2. Master.registerWorker

  1.     workers.filter { w =>
          (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
        }.foreach { w =>
          workers -= w
        }

移除状态位DEADWorkerInfo

  1.     val workerAddress = worker.actor.path.address
        if (addressToWorker.contains(workerAddress)) {
          val oldWorker = addressToWorker(workerAddress)
          if (oldWorker.state == WorkerState.UNKNOWN) {
            // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
            // The old worker must thus be dead, so we will remove it and accept the new worker.
            removeWorker(oldWorker)
          } else {
            logInfo("Attempted to re-register worker at same address: " + workerAddress)
            return false
          }
        }
        workers += worker
        idToWorker(worker.id) = worker
        addressToWorker(workerAddress) = worker

记录WorkInfo信息至workersidToWorkeraddressToWorker

4. 启动结束

到此,启动过程就完成了。

接下来开始等待workerdriver消息请求。