【原】1.1RDD源码解读(一)

时间:2022-05-22 09:17:57

1.RDD(Resilient Distributed DataSet)是Spark生态系统中最基本的抽象,代表不可变的、可并行操作的分区元素集合。RDD这个类有RDD系列所有基本的操作,比如map、filter、persist.另外,org.apache.spark.rdd.PairRDDFunctions含有key-value类型的RDD的基本操作,比如groupby、join;org.apache.spark.rdd.DoubleRDDFunctions含有Double类型的RDD的基本操作;org.apache.spark.rdd.SequenceFileRDDFunctions含有可以将RDD保存SequenceFiles的基本操作。所有的操作会通过有隐式转换适用于任何RDD。

每个RDD的5个主要属性:

- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

2.重要方法解读

(1)//注册一个新的RDD,并根据当前值加1返回它的RDD的ID
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

(2)缓存相关

a)persist、cache

/**
 * 指定RDD缓存的Level,详见StorageLevel object
 *
 * @param newLevel 缓存Level

* @param allowOverride 是否重写缓存
 */
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}

//可见cache其实是调用的persist方法,RDD默认的缓存策略是MEMORY_ONLY

def cache(): this.type = persist()

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

b)unpersist

//将RDD设置为不缓存,并且把内存或磁盘上的blocks都删除

def unpersist(blocking: Boolean = true): this.type = {
  logInfo("Removing RDD " + id + " from persistence list")
  sc.unpersistRDD(id, blocking)

//将缓存Level设置为NONE
  storageLevel = StorageLevel.NONE
  this
}

unpersistRDD(id,blocking)的源码如下所示:

/**
 * 将内存或磁盘中缓存的RDD删除

*/
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
  env.blockManager.master.removeRdd(rddId, blocking)

//persistentRdds是一个弱引用得HashMap,key为rddId,value为对应的RDD
  persistentRdds.remove(rddId)
  listenerBus.post(SparkListenerUnpersistRDD(rddId))
}

(3)分区partitions

//得到RDD的所有分区,并以数组形式返回

final def partitions: Array[Partition] = {
  checkpointRDD.map(_.partitions).getOrElse {
    if (partitions_ == null) {
      partitions_ = getPartitions
    }
    partitions_
  }
}

(4)

//得到分区预先存放的位置

final def preferredLocations(split: Partition): Seq[String] = {
  checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
    getPreferredLocations(split)
  }
}

(5)依赖

//得到窄依赖的祖先节点

private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
  val ancestors = new mutable.HashSet[RDD[_]]

def visit(rdd: RDD[_]) {
    val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
    val narrowParents = narrowDependencies.map(_.rdd)
    val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
    narrowParentsNotVisited.foreach { parent =>
      ancestors.add(parent)
      visit(parent)
    }
  }