spark storage之SparkEnv

时间:2023-03-08 17:02:41

此文旨在对spark storage模块进行分析,整理自己所看所得,等以后再整理。

ok,首先看看SparkContext中sparkEnv相关代码:

   private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
} private[spark] def env: SparkEnv = _env

话说这怎么插入代码找不到Scala。。

SparkContext中调用Object SparkEnv的createDriverEnv来创建SparkEnv。从这个入口进入看看sparkEnv做了什么:

   /**
* Create a SparkEnv for the driver.
*/
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
hostname,
port,
isDriver = true,
isLocal = isLocal,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}

首先确定了driver的host和port,然后执行create。

   private def create(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
listenerBus: LiveListenerBus = null,
numUsableCores: Int = 0,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { // Listener bus is only used on the driver
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
} val securityManager = new SecurityManager(conf) // Create the ActorSystem for Akka and get the port it binds to.
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
} // Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
.asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
try {
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
cls.getConstructor().newInstance().asInstanceOf[T]
}
}
} // Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
} val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}") val closureSerializer = instantiateClassFromConf[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer") def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
} val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
} // Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) // Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores) val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf, securityManager, numUsableCores)
case "nio" =>
logWarning("NIO-based block transfer service is deprecated, " +
"and will be removed in Spark 1.6.0.")
new NioBlockTransferService(conf, securityManager)
} val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver) // NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
numUsableCores) val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) val cacheManager = new CacheManager(blockManager) val httpFileServer =
if (isDriver) {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
null
} val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
} // Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
val sparkFilesDir: String = if (isDriver) {
Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
} else {
"."
} val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef) val executorMemoryManager: ExecutorMemoryManager = {
val allocator = if (conf.getBoolean("spark.unsafe.offHeap", false)) {
MemoryAllocator.UNSAFE
} else {
MemoryAllocator.HEAP
}
new ExecutorMemoryManager(allocator)
} val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
cacheManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
securityManager,
httpFileServer,
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
executorMemoryManager,
outputCommitCoordinator,
conf) // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
} envInstance
}

1、创建ActorSystem,并且绑定port

  对于每次创建SparkEnv,都要创建一个rpcEnv来进行通信,在这里创建使用的是

 val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)

  这里的ActorSystemName对于Drvier和executor都有着约定好的名称,driver为‘sparkDriver’,executor为'sparkExecutor'。在创建rpcEnv时,使用的是

   private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
// Add more RpcEnv implementations here
val rpcEnvNames = Map("akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory")
val rpcEnvName = conf.get("spark.rpc", "akka")
val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
} def create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): RpcEnv = {
// Using Reflection to create the RpcEnv to avoid to depend on Akka directly
val config = RpcEnvConfig(conf, name, host, port, securityManager)
getRpcEnvFactory(conf).create(config)
}

  系统默认的RpcEnvFactory是‘org.apache.spark.rpc.akka.AkkaRpcEnvFactory’,代码写成这样大概是为了。。。拓展性?下面是AkkaRpcEnvFactory的内容

 private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory {

   def create(config: RpcEnvConfig): RpcEnv = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
config.name, config.host, config.port, config.conf, config.securityManager)
actorSystem.actorOf(Props(classOf[ErrorMonitor]), "ErrorMonitor")
new AkkaRpcEnv(actorSystem, config.conf, boundPort)
}
}

  使用name,hostname,port,conf和securityManager创建了一个ActorSystem,并返回一个AkkaRpcEnv。不深究的话这个rpcEnv可以认为是一个通信模块,这个用来作为driver和executor的blockManager之间的通信。

2、在conf中设置driver或者executor的rpc port

3、创建一个serializer和closureSerializer(abstract class Serializer)

4、创建MapOutputTracker

5、对shuffle managers绑定短名称。。。话说这东西用map映射感觉也不怎么方便啊

6、创建shuffle manager(private[spark] trait ShuffleManager)和shuffle memory manager,并绑定

7、blockManager相关的初始化,这个是之后driver与executor通信的模块与资源管理模块。

     def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}

再次看这一段代码,可以看到,当创建SparkEnv的角色是driver时,setupEndpoint,启动这个Actor,name是BlockManagerMaster,当创建SparkEnv的角色时executor时,会通过

   def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
val driverActorSystemName = SparkEnv.driverActorSystemName
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
rpcEnv.setupEndpointRef(driverActorSystemName, RpcAddress(driverHost, driverPort), name)
}

获得driver的host和port,然后创建driver的EndPoint的Ref引用。

8、创建了broadcastManager, cacheManager, httpFileServer(仅driver,设定了conf配置中的spark.fileserver.uri)

9、创建metricsSystem。

  spark的测量模块包含3个部分,instance指定谁来使用这个测量模块。在spark中有若干个角色,如master, worker, client driver这些角色使用测量模块用以监控。现在spark中已经有master, worker, executor, driver, applications的实现。source指定数据来源,在spark中有spark内部来源(如MasterSource, WorkerSource等收集spark内部状态数据)和common source(更低层,如JVMSource,获取低层状态。由config配置并通过reflection载入)。sink指定metric data的目的地,可以指定多个。

10、创建sparkFilesDir,如果是driver创建一个临时路径,如果是executor使用当前工作目录。

11、创建executorMemoryManager

12、用刚才创建好的这些完成sparkEnv的创建。

  我们可以看到,SparkEnv包含了丰富的内容,有测量模块,block管理模块,甚至序列化模块。其他方面先跳过,来看一下这个sparkEnv创建好了怎么用,block又是怎么管理的。

     // Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()

  后面的分析将基于deploy模式进行。现在deploy模式的scheduler创建完毕了,创建了个TaskSchedulerImpl,并且和唯一的DAGScheduler和SparkDeploySchedulerBackend绑定了。SparkDeploySchedulerBackend又创建了AppClient作为driver client端和Master保持通信。继承的CoarseGrainedSchedulerBackend又和executor进行通信。顾名思义,SchedulerBackend与executor通信的是task层面的,blockmanager通信的是存储层面的。那这两个有什么具体的区别?下篇文章应该是写这个。接下来看看Executor端的SparkEnv创建时的情况:

  首先SparkDeploySchedulerBackend创建了AppDescription,主要功能是app的描述信息,这个appDesc包含一个command用以在worker端启动executor,在deploy模式下,默认是如下: Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)(这个Command是一个case class)。也就是说启动的mainclass是CoarseGrainedExecutorBackend。这个AppDesc会随着RegisterApplication消息发送给Master端,Master端在通过调度之后发送消息LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)通知Worker启动Executor,worker收到启动命令之后执行executorRunner来启动并返回给master一个ExecutorStateChanged消息告诉Master已经启动。

  在ExecutorRunner中,会通过processbuilder来启动我们的Executor,也就是private[spark] object CoarseGrainedExecutorBackend extends Logging{}这个。启动过程中传递过来的有SparkConf的详细信息:

       val driverConf = new SparkConf()
for ((key, value) <- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
} else {
driverConf.set(key, value)
}
}

  这样就获得driver的Actor的host和port。接下来就是和driver几乎相同的sparkEnv创建。