Spark算子执行流程详解之五

时间:2022-12-19 20:42:19

22.combineByKey

 

def combineByKey[C](createCombiner:V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K,C)] = self.withScope {
  require(mergeCombiners != null,"mergeCombiners must be defined")// required as of Spark 0.9.0
 
if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
  }
  val aggregator = newAggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  if (self.partitioner== Some(partitioner)) {//如果分区函数相同,则不需要shuffle,只需要进行一次mapPartitions
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {//否则需要进行shuffle
    new ShuffledRDD[K,V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }

}

关注其入参:

combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。

要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的键相同。combineByKey()的处理流程如下:

如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。(!注意:这个过程会在每个分区第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。

如果这是一个在处理当前分区中之前已经遇到键,此时combineByKey()使用mergeValue()将该键的累加器对应的当前值与这个新值进行合并。

由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()将各个分区的结果进行合并。如果mergeCombiners为True,则在map的输出的时候提前进行一次合并,如果mergeCombiners为false,则在reduce结果的时候进行一次合并。提前进行合并的作用是为了减少shuffle读取的时候传输的数据量,提升shuffle read的速度。

先来看下ShuffleRDD里面的依赖:

class ShuffledRDD[K,V, C](
    @transient varprev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K,C)](prev.context, Nil) {
   …
  override def getDependencies: Seq[Dependency[_]] = {//shufflewritereadShuffleDependency生成
    List
(newShuffleDependency(prev, part, serializer,keyOrdering, aggregator,mapSideCombine))
  }
   …

}

通过向shuffleManager注册获取shuffle读写句柄,默认的shufflemanager是SortShuffleManager

class ShuffleDependency[K,V, C](
    @transient _rdd: RDD[_ <: Product2[K,V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K,V, C]] = None,
    val mapSideCombine: Boolean =false)
  extends Dependency[Product2[K,V]] {

  override def rdd: RDD[Product2[K,V]] = _rdd.asInstanceOf[RDD[Product2[K,V]]]

  val shuffleId: Int = _rdd.context.newShuffleId()
  //注册shuffle句柄
  val
shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

通过getwriter和getreader获取各自的读写句柄

private[spark] classSortShuffleManager(conf: SparkConf) extendsShuffleManager {

  private val indexShuffleBlockResolver= new IndexShuffleBlockResolver(conf)
  private val shuffleMapNumber= new ConcurrentHashMap[Int, Int]()

  /**
   * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
   */
 
override def registerShuffle[K,V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    new BaseShuffleHandle(shuffleId, numMaps, dependency)
  }

  /**
   * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
   * Called on executors by reduce tasks.
   */
 
override def getReader[K,C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    // We currently use the same block store shuffle fetcher as the hash-based shuffle.
   
new HashShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _,C]], startPartition, endPartition, context)
  }

  /** Get a writer for a given partition. Called on executors by map tasks. */
 
override def getWriter[K,V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
      : ShuffleWriter[K, V] = {
    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K,V, _]]
    shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
    new SortShuffleWriter(
      shuffleBlockResolver, baseShuffleHandle, mapId, context)
  }

  /** Remove a shuffle's metadata from the ShuffleManager. */
 
override def unregisterShuffle(shuffleId: Int): Boolean = {
    if (shuffleMapNumber.containsKey(shuffleId)) {
      val numMaps = shuffleMapNumber.remove(shuffleId)
      (0 until numMaps).map{ mapId =>
        shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
      }
    }
    true
 
}

  override val shuffleBlockResolver: IndexShuffleBlockResolver = {
    indexShuffleBlockResolver
 
}

  /** Shut down this ShuffleManager. */
 
override def stop(): Unit = {
    shuffleBlockResolver.stop()
  }

}

先看写句柄SortShuffleWriter:

private[spark] classSortShuffleWriter[K, V, C](
    shuffleBlockResolver: IndexShuffleBlockResolver,
    handle: BaseShuffleHandle[K, V, C],
    mapId: Int,
    context: TaskContext)
  extends ShuffleWriter[K,V] with Logging {

  private val dep= handle.dependency

  private val blockManager= SparkEnv.get.blockManager

  private var sorter: ExternalSorter[K,V, _] = null

 
// Are we in the process of stopping? Because map tasks can call stop() with success = true
  // and then call stop() with success = false if they get an exception, we want to make sure
  // we don't try deleting files, etc twice.
 
private var stopping= false

  private var
mapStatus: MapStatus =null

  private val
writeMetrics = new ShuffleWriteMetrics()
  context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)

  /** Write a bunch of records to this task's output */
 
override def write(records:Iterator[Product2[K,V]]): Unit = {
    if (dep.mapSideCombine) {//map端聚合
      require(dep.aggregator.isDefined,"Map-side combine without Aggregator specified!")
      sorter = newExternalSorter[K, V, C](
        dep.aggregator, Some(dep.partitioner),dep.keyOrdering, dep.serializer)
      sorter.insertAll(records)
    } else {//不在map端聚合,连dep.partitioner都不传下去了,map端仅仅是按照分区函数分区罢了,其他不做任何事情
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
     
sorter
= newExternalSorter[K, V, V](None, Some(dep.partitioner), None,dep.serializer)
      sorter.insertAll(records)
    }

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
   
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }

}

且看ExternalSorter的insertAll:

def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = {
  // TODO: stop combining if we find that the reduction factor isn't high
 
val
shouldCombine = aggregator.isDefined

  if (shouldCombine) {//如果在map端聚合,则需要利用mergeValuecreateCombiner功能
    // Combine values in-memory first using our AppendOnlyMap
   
val mergeValue = aggregator.get.mergeValue
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K,V] = null
    val
update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2)else createCombiner(kv._2)
    }
    while (records.hasNext) {
      addElementsRead()
      kv = records.next()
      map.changeValue((getPartition(kv._1), kv._1), update)
      maybeSpillCollection(usingMap = true)
    }
  } else if (bypassMergeSort) {//否则如果分区个数少的话,则写多个分区文件
    // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
   
if (records.hasNext) {
      spillToPartitionFiles(
        WritablePartitionedIterator.fromIterator(records.map { kv =>
          ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
        })
      )
    }
  } else {//否则如果分区个数多的话,则写一个文件,怕临时文件多
    // Stick values into our buffer
   
while (records.hasNext) {
      addElementsRead()
      val kv = records.next()
      buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
      maybeSpillCollection(usingMap = false)
    }
}

接着看读句柄HashShuffleReader:

private[spark] classHashShuffleReader[K, C](
    handle: BaseShuffleHandle[K, _, C],
    startPartition: Int,
    endPartition: Int,
    context: TaskContext)
  extends ShuffleReader[K,C]
{
  require(endPartition == startPartition + 1,
    "Hash shuffle currently only supports fetching one partition")

  private val dep= handle.dependency

  /** Read the combined key-values for this reduce task */
 
override def read():Iterator[Product2[K,C]] = {
    val ser = Serializer.getSerializer(dep.serializer)
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {//如果已经在map端聚合过的话,则利用mergeCombiners即可完成数据聚合
        new
InterruptibleIterator(context,dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {//否则利用createCombinermergeValue进行聚合
        new
InterruptibleIterator(context,dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else {
      require(!dep.mapSideCombine,"Map-side combine without Aggregator specified!")

      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
     
iter.asInstanceOf[Iterator[Product2[K,C]]].map(pair => (pair._1, pair._2))
    }

    // Sort the output if there is a sort ordering defined.
   
dep
.keyOrdering match{
      case Some(keyOrd:Ordering[K]) =>
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
       
val sorter =new ExternalSorter[K,C, C](ordering =Some(keyOrd), serializer = Some(ser))
        sorter.insertAll(aggregatedIter)
        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
        sorter.iterator
      case None =>
        aggregatedIter
    }
  }

}

因此假设一下的场景combineByKey的mapSideCombine一个为false,另外一个为true的情况:

val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)) 

val d1 = sc.parallelize(initialScores) 

type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数) 

d1.combineByKey( 

  score => (1, score), 

  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), 

  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),

  RangePartitioner,

  false

).map { case (name, (num, socre)) => (name, socre / num) }.collect

d1.combineByKey( 

  score => (1, score), 

  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), 

  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),

).map { case (name, (num, socre)) => (name, socre / num) }.collect

mapSideCombine为false的执行流程如下:

Spark算子执行流程详解之五

mapSideCombine为true的执行流程如下:

Spark算子执行流程详解之五

可见提前在map端做聚合可以减少shuffle过程中产生的数据。


23.distinct()

去重,删除RDD中相同的元素

def distinct(): RDD[T] = withScope {
  distinct(partitions.length)
}

def distinct(numPartitions: Int)(implicitord: Ordering[T] =null): RDD[T] = withScope {
  map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

def reduceByKey(func: (V,V) => V, numPartitions: Int): RDD[(K,V)] = self.withScope {
  reduceByKey(new HashPartitioner(numPartitions), func)
}

def reduceByKey(partitioner: Partitioner, func: (V,V) => V): RDD[(K,V)] = self.withScope {
  combineByKey[V]((v: V) => v, func, func, partitioner)
}

最终调用的还是combineByKey,因为对于RDD最终的聚合类操作,其本质运算都是由combineByKey完成的。其具体的执行流程如下:

Spark算子执行流程详解之五

24.groupByKey

将相同key的记录聚合起来

/**
 * Group the values for each key in the RDD into a single sequence. Allows controlling the
 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
 * The ordering of elements within each group is not guaranteed, and may even differ
 * each time the resulting RDD is evaluated.
 *
 * Note: This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
 * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
 *
 * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
 * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
 */
def groupByKey(partitioner: Partitioner): RDD[(K,Iterable[V])] = self.withScope {
  // groupByKey shouldn't use map side combine because map side combine does not
  // reduce the amount of data shuffled and requires all map side data be inserted
  // into a hash table, leading to more objects in the old gen.
 
val createCombiner = (v:V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v:V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKey[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]

}

其本质就是利用combineByKey来实现相同KEY的聚合操作,但是需要注意的一点是groupByKey不在map端聚合,因为它在map端聚合无法减少网络传输的数据量,反而会增加maptask运行消耗的java内存,进而导致GC拖慢整个计算过程。

现在假设分区函数相同和不同的情况下其groupbykey的执行流程如下:

分区函数相同:

Spark算子执行流程详解之五

分区函数不相同:

 Spark算子执行流程详解之五

25.aggregateByKey

聚合操作,将相同的key的value聚合起来,类似于sql里面的聚合函数,可以实现求max,min,avg等操作。

/**
 * Aggregate the values of each key, using given combine functions and a neutral "zero value".
 * This function can return a different result type, U, than the type of the values in this RDD,
 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
 * as in scala.TraversableOnce. The former operation is used for merging values within a
 * partition, and the latter is used for merging values between partitions. To avoid memory
 * allocation, both of these functions are allowed to modify and return their first argument
 * instead of creating a new U.

/

def aggregateByKey[U: ClassTag](zeroValue:U, numPartitions: Int)(seqOp: (U,V) => U,
    combOp: (U, U) =>U): RDD[(K,U)] = self.withScope {
  aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)

}

/**
 * Aggregate the values of each key, using given combine functions and a neutral "zero value".
 * This function can return a different result type, U, than the type of the values in this RDD,
 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
 * as in scala.TraversableOnce. The former operation is used for merging values within a
 * partition, and the latter is used for merging values between partitions. To avoid memory
 * allocation, both of these functions are allowed to modify and return their first argument
 * instead of creating a new U.
 */
def aggregateByKey[U: ClassTag](zeroValue:U, partitioner: Partitioner)(seqOp: (U,V) => U,
    combOp: (U, U) =>U): RDD[(K,U)] = self.withScope {
  // Serialize the zero value to a byte array so that we can get a new clone of it on each key
 
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
  val zeroArray = newArray[Byte](zeroBuffer.limit)
  zeroBuffer.get(zeroArray)

  lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
  val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

  // We will clean the combiner closure later in `combineByKey`
 
val cleanedSeqOp = self.context.clean(seqOp)
  combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)

}

首先需要和aggregate操作区别开来,aggregate里面的SeqOp和combOp都会使用zeroValue的值,而aggregateByKey的zeroValue只会在SeqOp中使用。且其mapSideCombine为true,会在map端进行聚合,假设利用aggregateByKey计算每月平均气温的操作如下:

val rdd = sc.textFile("气象数据")   

val rdd2 = rdd.map(x=>x.split(" ")).map(x => (x(0).substring("从年月日中提取月"),x(1).toInt))   

val zeroValue = (0,0)  

val seqOp= (u:(Int, Int), v:Int) => {   

 (u._1 + 1, u._2 + v)   

}   

   

val compOp= (c1:(Int,Int),c2:(Int,Int))=>{   

  (u1._1 + u2._1, u1._2 + u2._2)   

}   

   

   

val vdd3 = vdd2.aggregateByKey(   

zeroValue ,   

seqOp,   

compOp 

)   

   

rdd3.foreach(x=>println(x._1 + ": average tempreture is " + x._2._2/x._2._1)

由于rdd2是从rdd转化而来,而rdd是来自文本数据,则其无分区函数,而aggregateByKey执行时是以hash分区的,那么其计算过程如下:

Spark算子执行流程详解之五