Spark源码阅读之存储体系--存储体系概述与shuffle服务

时间:2021-07-22 06:09:39

一、概述

根据《深入理解Spark:核心思想与源码分析》一书,结合最新的spark源代码master分支进行源码阅读,对新版本的代码加上自己的一些理解,如有错误,希望指出。

1.块管理器BlockManager的实现

块管理器是Spark存储体系的核心组件,Driver Application和Executor都会创建BlockManager,源代码位置在core/org.apache.spark.storage,部分代码如下。

private[spark] val externalShuffleServiceEnabled =
conf.getBoolean("spark.shuffle.service.enabled", false) val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.
val deleteFilesOnStop =
!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
new DiskBlockManager(conf, deleteFilesOnStop)
} // Visible for testing
private[storage] val blockInfoManager = new BlockInfoManager private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) // Actual storage of where blocks are kept
private[spark] val memoryStore =
new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
memoryManager.setMemoryStore(memoryStore) // Note: depending on the memory manager, `maxMemory` may actually vary over time.
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory // Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort = {
val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
if (tmpPort == 0) {
// for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
// an open port. But we still need to tell our spark apps the right port to use. So
// only if the yarn config has the port set to 0, we prefer the value in the spark config
conf.get("spark.shuffle.service.port").toInt
} else {
tmpPort
}
} var blockManagerId: BlockManagerId = _ // Address of the server that serves this executor's shuffle files. This is either an external
// service, or just our own Executor's BlockManager.
private[spark] var shuffleServerId: BlockManagerId = _ // Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager,
securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
} // Max number of failures before this block manager refreshes the block locations from the driver
private val maxFailuresBeforeLocationRefresh =
conf.getInt("spark.block.failures.beforeLocationRefresh", 5) private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker)) // Pending re-registration action being executed asynchronously or null if none is pending.
// Accesses should synchronize on asyncReregisterLock.
private var asyncReregisterTask: Future[Unit] = null
private val asyncReregisterLock = new Object // Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L private var blockReplicationPolicy: BlockReplicationPolicy = _

上面代码中声明的BlockInfoManager用于管理BlockManager缓存BlockId及对应的BlockInfo,BlockInfoManager提供一些列的同步读写策略。BlockManager由以下部分组成。

1)shuffle客户端shuffleClient;

2)BlockManagerMaster,对存在于所有Executor上的BlockManager进行统一管理;

3)磁盘块管理器DiskBlockManager;

4)内存存储MemoryStore;

5)磁盘存储DiskStore;

BlockManager要生效必须要初始化,初始化代码如下,

def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId) blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
} val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) val idFromMaster = master.registerBlockManager(
id,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint) blockManagerId = if (idFromMaster != null) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
} // Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
} logInfo(s"Initialized BlockManager: $blockManagerId")
}

1)BlockTransferService和shuffle客户端shuffleClient的初始化,ShuffleClien默认是BlockTransferService,当有外部的ShuffleService时,则调用外部的ExternalShuffleClient。

2)创建id为本地BlockManagerId,向BlockManagerMaster注册此id,获取从BlockManagerMaster的idFromMaster,如果idFromMaster为空则BlockManagerId为刚才创建的id,否则使用BlockManagerMaster注册到的idFromMaster。

3)ShuffleServerId的创建,当有外部的ShuffleService时,创建新的BlockManagerId作为ShuffleServerId。

4)当有外部的ShuffleService并且当前BlockMaId不是Driver端,则需要向ShuffleClient注册ShuffleServerId

2.Spark存储体系架构

Spark源码阅读之存储体系--存储体系概述与shuffle服务

1)1表示Executor的BlockManager与Driver的BlockManager进行消息通信,例如注册BlockManager、更新Block信息、获取Block所在的BlockManager、删除Executor等

2)2表示对BlockManager的读操作如get、doGetLocal等和写操作doPut、puSingle等

3)3表示当MemoryStore的内存不足时,写入DiskStore,而DiskStore实际依赖于DiskBlockManager

4)4表示通过访问远端节点的Executor的BlockManager中的TransportServer提供RPC服务下载或者上传Block

5)5表示远端节点的Executor的BlockManager访问本地Executor的BlockManager中的TransportServer提供的RPC服务下载或者上传Block。

二、shuffle服务与客户端

1.Block的RPC服务

当map任务与reduce任务处于不同的节点时,reduce任务需要从远端节点下载map任务的中间件输出,因此NettyBlockRpcServer提供打开,即下载Block文件的功能;一些情况下,为了容错,需要将Block的数据备份到其他节点上,所以NettyBlockRpcServer还提供了上传Block文件的RPC服务,实现见代码,代码位置:core/org.apache.spark.network.netty。

class NettyBlockRpcServer(
appId: String,
serializer: Serializer,
blockManager: BlockDataManager)
extends RpcHandler with Logging { private val streamManager = new OneForOneStreamManager() override def receive(
client: TransportClient,
rpcMessage: ByteBuffer,
responseContext: RpcResponseCallback): Unit = {
val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
logTrace(s"Received request: $message") message match {
case openBlocks: OpenBlocks =>
val blocksNum = openBlocks.blockIds.length
val blocks = for (i <- (0 until blocksNum).view)
yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
logTrace(s"Registered streamId $streamId with $blocksNum buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer) case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
val (level: StorageLevel, classTag: ClassTag[_]) = {
serializer
.newInstance()
.deserialize(ByteBuffer.wrap(uploadBlock.metadata))
.asInstanceOf[(StorageLevel, ClassTag[_])]
}
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
blockManager.putBlockData(blockId, data, level, classTag)
responseContext.onSuccess(ByteBuffer.allocate(0))
}
} override def getStreamManager(): StreamManager = streamManager
}

2.构造传输上下文TransportContext

代码位置,common/network-common/org.apache.spark.network

  public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.closeIdleConnections = closeIdleConnections;
}

TransportContext既可以创建Netty服务,也可以创建Netty访问客户端,组成部分如下。

1)TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客户端和服务端线程数量等

2)RpcHandler:负责shuffle的I/O服务端在接收到客户端的RPC请求后,提供打开Block或者上传Block的RPC处理,此处实现为NettyBlockRpcServer

3)是否关闭闲置连接

3.RPC客户端工厂TransportClientFactory

 public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
return new TransportClientFactory(this, bootstraps);
}

以下分析TransportClientFactory代码。

public TransportClientFactory(
TransportContext context,
List<TransportClientBootstrap> clientBootstraps) {
this.context = Preconditions.checkNotNull(context);
this.conf = context.getConf();
this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
this.connectionPool = new ConcurrentHashMap<>();
this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
this.rand = new Random(); IOMode ioMode = IOMode.valueOf(conf.ioMode());
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
this.workerGroup = NettyUtils.createEventLoop(
ioMode,
conf.clientThreads(),
conf.getModuleName() + "-client");
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
}

1)clientBootstraps:用于缓存客户端列表

2)connectionPool:用户缓存客户端连接

3)numConnectionsPerPeer:节点之间取数据的连接数,可以使用属性spark.shuffle.io.numConnectionsPerPeer来配置,默认为1

4)SocketChannelClass:客户端channel被创建时使用的类,可以使用属性spark.shuffle.io.mode来配置

5)workerGroup:根据Netty的规范,客户端只有worker组,所以此处创建workerGroup,实际是NioEventLoopGroup

6)pooledAllocator:汇集ByteBuf但对本地线程缓存禁用的分配器。

4.Netty服务器TransportServer

TransportServer提供了Netty实现的服务器端,用于提供RPC服务,如上传、下载等,代码如下。

 public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}

TransportServer构造器如下

public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
this.context = context;
this.conf = context.getConf();
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps)); try {
init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}

init方法对TransportServer进行初始化,通过使用Netty框架的EventLoopGroup、ServerBootstrap等API创建shuffle的I/O交互的服务端,主要代码见清单。

  private void init(String hostToBind, int portToBind) {

    IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup; PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator); if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
} if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
} if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
} bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
}); InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}

1)ioMode:NIO或者EPOLL

2) ChannelOption.ALLOCATOR:在Netty 4中实现了一个新的ByteBuf内存池,它是一个纯Java版本的 jemalloc (Facebook也在用)。现在,Netty不会再因为用零填充缓冲区而浪费内存带宽了。不过,由于它不依赖于GC,开发人员需要小心内存泄漏。如果忘记在处理程序中释放缓冲区,那么内存使用率会无限地增长。Netty默认不使用内存池,需要在创建客户端或者服务端的时候进行指定,使用内存池之后,内存的申请和释放必须成对出现,即retain()和release()要成对出现,否则会导致内存泄露。

3)RpcHandler处理接收到的数据逻辑

5.获取远程shuffle文件

NettyBlockTransferService的fetchBlocks方法用于获取远程的shuffle文件,实际是使用NettyBlockTransferService中创建的Netty服务。

override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
tempShuffleFileManager: TempShuffleFileManager): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
val client = clientFactory.createClient(host, port)
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
transportConf, tempShuffleFileManager).start()
}
} val maxRetries = transportConf.maxIORetries()
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
} else {
blockFetchStarter.createAndStart(blockIds, listener)
}
} catch {
case e: Exception =>
logError("Exception while beginning fetchBlocks", e)
blockIds.foreach(listener.onBlockFetchFailure(_, e))
}
}

6.上传shuffle文件

NettyBlockTransferService的uploadBlock方法用于上传shuffle文件到远程的Executor,实际也是用NettyBlockTransferService中创建的Netty服务,步骤如下。

override def uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit] = {
val result = Promise[Unit]()
val client = clientFactory.createClient(hostname, port) // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
// Everything else is encoded using our binary protocol.
val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) // Convert or copy nio buffer into array in order to serialize it.
val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId")
result.success((): Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
result.failure(e)
}
}) result.future
}

1)创建Netty服务的客户端,客户端连接的hostname和port正是BlockManager的hostname和port

2)将Block的存储级别StorageLevel和类标签序列化

3)将Block的ByteBuffer转化为数据,便于序列化

4)将appId、execId、blockId、metadata、转化为数组的Block封装为UploadBlock,并将其序列化为字节数组

5)最终调用Netty客户端的sendRpc方法将字节数组上传,回掉函数RpcResponseCallback根据RPC的结果更改上传状态。