Spark算子执行流程详解之七

时间:2022-12-19 20:47:06

31.union

将2个rdd合并在一起。
def union(other: RDD[T]): RDD[T] = withScope {
  if (partitioner.isDefined && other.partitioner == partitioner) {//两者的分区函数相同
    new
PartitionerAwareUnionRDD(sc, Array(this, other))
  } else {//两者的分区函数不同
    new
UnionRDD(sc, Array(this, other))
  }
}
先来看当两者的分区函数相同时其是如何处理的: 
private[spark]
class PartitionerAwareUnionRDD[T: ClassTag](
    sc: SparkContext,
    var rdds: Seq[RDD[T]]
  ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
  require(rdds.length > 0)
  require(rdds.forall(_.partitioner.isDefined))
  require(rdds.flatMap(_.partitioner).toSet.size == 1,
    "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))

  override val partitioner = rdds.head.partitioner

//生成PartitionerAwareUnionRDDPartition,保存了组成某个分区索引为index的分区来源于rdds的哪几个分区
 
override def getPartitions: Array[Partition] = {
    val numPartitions = partitioner.get.numPartitions
    (0 until numPartitions).map(index => {
      new PartitionerAwareUnionRDDPartition(rdds, index)
    }).toArray
  }

  // Get the location where most of the partitions of parent RDDs are located
 
override def getPreferredLocations(s: Partition): Seq[String] = {
    logDebug("Finding preferred location for " + this + ", partition " + s.index)
    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
   
val locations = rdds.zip(parentPartitions).flatMap {
      case (rdd, part) => {
        val parentLocations = currPrefLocs(rdd, part)
        logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations)
        parentLocations
      }
    }
    val location = if (locations.isEmpty) {
      None
    } else {
      // Find the location that maximum number of parent partitions prefer
     
Some
(locations.groupBy(x => x).maxBy(_._2.length)._1)
    }
    logDebug("Selected location for " + this + ", partition " + s.index + " = " + location)
    location.toSeq
  }

  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
//parents即指向了该分区来源于的rdds组合的哪几个分区
    val
parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
//然后就是遍历原始rdds组合的某几个分区组成单个分区
   
rdds.zip(parentPartitions).iterator.flatMap {
      case (rdd, p) => rdd.iterator(p, context)
    }
  }

  override def clearDependencies() {
    super.clearDependencies()
    rdds = null
 
}

  // Get the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
 
private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = {
    rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
  }
}
其分区信息为PartitionerAwareUnionRDDPartition: 
class PartitionerAwareUnionRDDPartition(
    @transient val rdds: Seq[RDD[_]],
    val idx: Int
  ) extends Partition {
// parents保存了对于分区索引idx来源于rdds的Partition信息,其实就是一一对应,比方说第1个分区来源于rdds组合中的每个rdd的第一个分区
  var
parents = rdds.map(_.partitions(idx)).toArray

  override val index = idx
  override def hashCode(): Int = idx

  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
    // Update the reference to parent partition at the time of task serialization
   
parents
= rdds.map(_.partitions(index)).toArray
    oos.defaultWriteObject()
  }
}
其实当分区函数相同时,其结果的RDD的对应分区来源于原始两个RDD的对应分区,即:
Spark算子执行流程详解之七
再来看当两者的分区函数不相同时其是如何处理的: 
class UnionRDD[T: ClassTag](
    sc: SparkContext,
    var rdds: Seq[RDD[T]])
  extends RDD[T](sc, Nil) {  // Nil since we implement getDependencies

 
override def getPartitions: Array[Partition] = {
//计算rdds组合总共有几个分区
    val
array = new Array[Partition](rdds.map(_.partitions.length).sum)
    var pos = 0
//总共有几个分区就生成几个分区,其每个分区各自对应rdds组合中的分区
   
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
// pos:分区索引,rdd:该分区的父rdd,rddIndex:父rdd在rdds中的索引,split.index:该分区的Partition信息
      array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
      pos += 1
   
}
    array
  }

  override def getDependencies: Seq[Dependency[_]] = {
    val deps = new ArrayBuffer[Dependency[_]]
    var pos = 0
   
for (rdd <- rdds) {
      deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
      pos += rdd.partitions.length
    }
    deps
  }

  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
    val part = s.asInstanceOf[UnionPartition[T]]
//rdds组合中的某个rdd所对应的分区数据
    parent[T](part.parentRddIndex).iterator(part.parentPartition, context)
  }

  override def getPreferredLocations(s: Partition): Seq[String] =
    s.asInstanceOf[UnionPartition[T]].preferredLocations()

  override def clearDependencies() {
    super.clearDependencies()
    rdds = null
 
}
}
且看UnionPartition中parentPartition代表的意思: 
/**
 * Partition for UnionRDD.
 *
 *
@param idx index of the partition
 *
@param rdd the parent RDD this partition refers to
 *
@param parentRddIndex index of the parent RDD this partition refers to
 *
@param parentRddPartitionIndex index of the partition within the parent RDD
 *                                this partition refers to
 */
private[spark] class UnionPartition[T: ClassTag](
    idx: Int,
    @transient rdd: RDD[T],
    val parentRddIndex: Int,
    @transient parentRddPartitionIndex: Int)
  extends Partition {
  // parentPartition来源于该分区对应父rdd的分区索引为parentRddPartitionIndex的Partition
  var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)

  def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)

  override val index: Int = idx

  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
    // Update the reference to parent split at the time of task serialization
   
parentPartition
= rdd.partitions(parentRddPartitionIndex)
    oos.defaultWriteObject()
  }
}
因此当两者的分区函数不相同时,其执行流程如下:
Spark算子执行流程详解之七

32. 两个加加++

其作用就是union
/**
 * Return the union of this RDD and another one. Any identical elements will appear multiple
 * times (use `.distinct()` to eliminate them).
 */
def ++(other: RDD[T]): RDD[T] = withScope {
  this.union(other)
}
 

33.intersection

求2个RDD的交集,其中相同的值只输出一次。
/**
 * Return the intersection of this RDD and another one. The output will not contain any duplicate
 * elements, even if the input RDDs did.
 *
 * Note that this method performs a shuffle internally.
 */
def intersection(other: RDD[T]): RDD[T] = withScope {
  this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
      .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
      .keys
}
其大致流程是先求交的两个rdd映射为KV对的pairRDD,其中V为null,然后生成CoGroupedRDD,接着对CoGroupedRDD的values进行转化为V为两个迭代器,紧接着进行筛选,保留左右两边rdd都存在的记录,最后返回其KEY值,即原始的左右两RDD的内容。
cogroup函数的内部实现如下:
/**
 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
 * list of values for that key in `this` as well as `other`.
 */
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
//defaultPartitioner求默认的分区函数
  cogroup(other, defaultPartitioner(self, other))
}
/**
 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
 * list of values for that key in `this` as well as `other`.
 */
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
    throw new SparkException("Default partitioner cannot partition array keys.")
  }
  val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
//将cg的values映射为两个数组的迭代器
  cg.mapValues { case Array(vs, w1s) =>
    (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  }
}
因此CoGroupedRDD的实现如下:
/**
 * :: DeveloperApi ::
 * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
 * tuple with the list of values for that key.
 *
 * Note: This is an internal API. We recommend users use RDD.cogroup(...) instead of
 * instantiating this directly.

 *
@param rdds parent RDDs.
 *
@param part partitioner used to partition the shuffle output
 */
@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {

  // For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
  // Each ArrayBuffer is represented as a CoGroup, and the resulting Array as a CoGroupCombiner.
  // CoGroupValue is the intermediate state of each value before being merged in compute.
 
private type CoGroup = CompactBuffer[Any]
  private type CoGroupValue = (Any, Int)  // Int is dependency number
 
private type CoGroupCombiner = Array[CoGroup]

  private var serializer: Option[Serializer] = None

  /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
 
def setSerializer(serializer: Serializer): CoGroupedRDD[K] = {
    this.serializer = Option(serializer)
    this
 
}
 
  override def getDependencies: Seq[Dependency[_]] = {
    rdds.map { rdd: RDD[_ <: Product2[K, _]] =>

//如果rdd的分区函数和CoGroupedRDD的分区函数相同,则相互之间的依赖是窄依赖

if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
//否则是宽依赖
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
      }
    }
  }

  /*
  *
获取其分区配置信息CoGroupPartition,其由CoGroupPartition(
    idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])组成
其中idx代表对应分区索引,narrowDeps存储的是其依赖的数组
*/
 
override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](part.numPartitions)
    for (i <- 0 until array.length) {
      // Each CoGroupPartition will have a dependency per contributing RDD
     
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
        // Assume each RDD contributed a single dependency, and get it
       
dependencies(j) match {
//宽依赖直接返回None
          case
s: ShuffleDependency[_, _, _] =>
            None
          case _ =>
//其他则为窄依赖
            Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
        }
      }.toArray)
    }
    array
  }

  override val partitioner: Some[Partitioner] = Some(part)

//在每个分区上根据传入的CoGroupPartition进行计算
  override def
compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
    val sparkConf = SparkEnv.get.conf
//此参数决定了其中间整理的过程是在内存中执行还是内存+磁盘中执行
    val
externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
    val split = s.asInstanceOf[CoGroupPartition]
    //代表有多少个rdd,每个rdd根据分区函数对应其依赖
   
val numRdds = dependencies.length

    // A list of (rdd iterator, dependency number) pairs
// rddIterators个KV的迭代器,其K为Product2的迭代器,其V是其索引
   
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
    for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
//如果是窄依赖,则直接拉取父RDD对应分区的数值
      case
oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
        val dependencyPartition = split.narrowDeps(depNum).get.split
        // Read them from the parent
       
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
        rddIterators += ((it, depNum))
     //如果是宽依赖,则从shuffle的中间结果拉取对应分区的数值
      case
shuffleDependency: ShuffleDependency[_, _, _] =>
        // Read map outputs of shuffle
       
val it = SparkEnv.get.shuffleManager
          .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
          .read()
        rddIterators += ((it, depNum))
    }
    /*
    * rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
    * [{Iterator[Product2[K, Any]],0},{Iterator[Product2[K, Any]],1}]
    * */

   
if (!externalSorting) {//在内存中整理中间结果
      val map = new AppendOnlyMap[K, CoGroupCombiner]//CoGroupCombiner为Buffer数组,相同的K只会保留1个
      val
update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
        if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
      }
      val getCombiner: K => CoGroupCombiner = key => {
        map.changeValue(key, update)
      }
//遍历迭代器数组,将相同的KEY的V存放在CoGroupCombiner里面
      rddIterators.foreach { case (it, depNum) =>
        while (it.hasNext) {
          val kv = it.next()
          getCombiner(kv._1)(depNum) += kv._2
        }
      }
      new InterruptibleIterator(context,
        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
    } else {//在内存+磁盘中整理中间结果
      val
map = createExternalMap(numRdds)
//插入到ExternalAppendOnlyMap里面
      for
((it, depNum) <- rddIterators) {
        map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
      }
      context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
      context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
      new InterruptibleIterator(context,
        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
    }
  }
  ……
}
因此假设两个RDD执行cogroup,其中一个rdd的分区函数为hash分区,分区个数为3,另外一个rdd没有分区函数,则其执行流程如下:
Spark算子执行流程详解之七

34.glom

glom函数将每个分区形成一个数组,得到一个新的GlommedRDD。
/**
 * Return an RDD created by coalescing all elements within each partition into an array.
 */
def glom(): RDD[Array[T]] = withScope {
//通过Iterator(iter.toArray)将其转化为数组
  new
MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
其执行过程如下:
Spark算子执行流程详解之七

35.cartesian

这个操作返回两个RDD的笛卡尔集,这个操作不会执行shuffle
/**
 * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
 * elements (a, b) where a is in `this` and b is in `other`.
 */
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
  new CartesianRDD(sc, this, other)
}
其主要由CartesianRDD实现,继续往下看: 
private[spark]
class CartesianRDD[T: ClassTag, U: ClassTag](
    sc: SparkContext,
    var rdd1 : RDD[T],
    var rdd2 : RDD[U])
  extends RDD[Pair[T, U]](sc, Nil)
  with Serializable {

  val numPartitionsInRdd2 = rdd2.partitions.length

//分区个数为两个rdd的分区数目之积
  override def
getPartitions: Array[Partition] = {
    // create the cross product split
   
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
      val idx = s1.index * numPartitionsInRdd2 + s2.index
//分区索引idx的数据来源于rdd1的index分区和rdd2的index分区
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    }
    array
  }

  override def getPreferredLocations(split: Partition): Seq[String] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
  }
  //通过遍历rdd1的s1分区和rdd2的s2分区组装成当前CartesianPartition的分区数据
  override def
compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }

//返回的都是窄依赖
  override def
getDependencies: Seq[Dependency[_]] = List(
    new NarrowDependency(rdd1) {
      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
    },
    new NarrowDependency(rdd2) {
      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
    }
  )

  override def clearDependencies() {
    super.clearDependencies()
    rdd1 = null
   
rdd2 = null
 
}
}
其具体的执行过程如下:
Spark算子执行流程详解之七