第22课:Spark性能调优之使用更高性能算子及其源码剖析

时间:2022-06-20 18:26:07

第22课:Spark性能调优之使用更高性能算子及其源码剖析

Spark性能调优之使用更高性能算子的重要性在于同样的情况下,如果使用更高性能的算子,从算子级别给我们带来更高的效率。Spark现在主推的是DataSet这个API接口,越来越多的算子可以基于DataSet去做,DataSet基于天然自带的优化引擎,理论上讲比RDD的性能更高,DataSet弱点是无法自定义很多功能。平时使用来讲,使用的最基本是Shuffle的算子。Shuffle分为2部分:Mapper端和Reducer端。性能调优的准则是尽量不使用Shuffle类的算子,尽量避免Shuffle。在进行Shuffle的时候,将多个节点的同一个Key的数据汇聚到同样一个节点进行Shuffle的操作,基本思路是将数据放入内存中,内存中放不下的话就放入磁盘中。

如果要避免Shuffle,所有的分布式计算框架是产生Map端的Join,2个RDD进行操作,先把RDD的数据收集过来,然后通过SparkContext进行BroadCast广播出去,假设原先是RDD1、RDD2,我们把RDD2广播出去,原来是进行Join,Join的过程肯定是进行某种计算,此时RDD2其实已经不是RDD了,就是一个数据结构体包裹着数据本身,对RDD1进行Join操作,就是一条条遍历数据跟另一个集合里的数据产生某种算法操作。

如果不能避免Shuffle,我们退而求其次,需要更多的机器承担Shuffle的工作,充分利用Mapper端和Reducer端机器的计算资源,尽量让Mapper端承担聚合的任务。如果在Mapper端进行Aggregate的操作,在Mapper端的进程中就会把相同的Key进行合并。

(1)reduceByKey和aggregateByKey取代groupByKey

PairRDDFunctions.scala的aggregateByKey,aggregateByKey算子使用给定的组合函数和一个中立的“零值”聚合每个key的值。这个函数可以返回不同的结果类型U,不同于RDD的值的类型 V。在 scala.TraversableOnce中,我们需要一个操作将V合并成U和一个操作合并两个U,前者的操作用于合并分区内的值,后者用于在分区之间合并值。为了避免内存

分配,这两个函数都允许修改和返回他们的第一个参数而不是创造一个新的U。aggregateByKey代码如下:

1.          def aggregateByKey[U: ClassTag](zeroValue: U,partitioner: Partitioner)(seqOp: (U, V) => U,

2.               combOp: (U, U) => U): RDD[(K, U)] =self.withScope {

3.             // Serialize the zero value to a byte arrayso that we can get a new clone of it on each key

4.             val zeroBuffer =SparkEnv.get.serializer.newInstance().serialize(zeroValue)

5.             val zeroArray = newArray[Byte](zeroBuffer.limit)

6.             zeroBuffer.get(zeroArray)

7.          

8.             lazy val cachedSerializer =SparkEnv.get.serializer.newInstance()

9.             val createZero = () =>cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

10.       

11.          // We will clean the combiner closure laterin `combineByKey`

12.          val cleanedSeqOp = self.context.clean(seqOp)

13.          combineByKeyWithClassTag[U]((v: V) =>cleanedSeqOp(createZero(), v),

14.            cleanedSeqOp, combOp, partitioner)

15.        }

例如:groupByKey会不会进行Mapper的聚合操作呢?不会。groupByKey重载函数都没有指定函数操作的功能。相对于groupByKey而言,我们倾向于采用reduceByKey和aggregateByKey来取代groupByKey,因为groupByKey不会进行Mapper端的aggregate的操作,所有的数据会通过网络传输传到Reducer端,性能会比较差。而我们进行aggregateByKey的时候,可以自定义Mapper端的操作和Reducer端的操作,当然reduceByKey和aggregateByKey算子是一样的。

PairRDDFunctions.scala的groupByKey代码如下:

1.          def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])] = self.withScope {

2.             // groupByKey shouldn't use map sidecombine because map side combine does not

3.             // reduce the amount of data shuffled andrequires all map side data be inserted

4.             // into a hash table, leading to moreobjects in the old gen.

5.             val createCombiner = (v: V) => CompactBuffer(v)

6.             val mergeValue = (buf: CompactBuffer[V], v:V) => buf += v

7.             val mergeCombiners = (c1: CompactBuffer[V],c2: CompactBuffer[V]) => c1 ++= c2

8.             val bufs =combineByKeyWithClassTag[CompactBuffer[V]](

9.               createCombiner, mergeValue, mergeCombiners,partitioner, mapSideCombine = false)

10.          bufs.asInstanceOf[RDD[(K, Iterable[V])]]

11.        }

reduceByKey和aggregateByKey在正常情况下取代groupByKey的2个问题:

1,groupByKey可进行分组,reduceByKey和aggregateByKey怎么进行分组?可采用算法控制。

2,reduceByKey和aggregateByKey都可以取代groupByKey,reduceByKey和aggregateByKey有什么区别?区别很简单,aggregateByKey给予我们更多的控制,可以定义Mapper端的aggregate函数和Reducer端aggregate函数;

 

(2)批量处理数据mapPartitions算子取代map算子

我们看一下RDD.scala源代码,RDD在处理一块又一块的写数据的时候,不要使用map算子,可以使用mapPartitions算子,但mapPartitions有一个弊端,会出现OOM的问题,因为每次处理掉一个Partitions的数据,对JVM也是一个负担。

RDD.scala的mapPartitions代码如下:

1.         def  mapPartitions[U: ClassTag](

2.               f: Iterator[T] => Iterator[U],

3.               preservesPartitioning: Boolean = false):RDD[U] = withScope {

4.             val cleanedF = sc.clean(f)

5.             new MapPartitionsRDD(

6.               this,

7.               (context: TaskContext, index: Int, iter:Iterator[T]) => cleanedF(iter),

8.               preservesPartitioning)

9.           }      

 

(3)批量数据处理foreachPartition取代foreach

foreach处理一条条的数据,foreachPartition将一批数据写入数据库或Hbase,至少提升50%的性能。RDD.scala的foreachPartition foreach源代码如下:

1.           def foreach(f: T => Unit): Unit = withScope{

2.             val cleanF = sc.clean(f)

3.             sc.runJob(this, (iter: Iterator[T]) =>iter.foreach(cleanF))

4.           }

5.          

6.           /**

7.            * Applies a function f to each partition ofthis RDD.

8.            */

9.           def foreachPartition(f: Iterator[T] =>Unit): Unit = withScope {

10.          val cleanF = sc.clean(f)

11.          sc.runJob(this, (iter: Iterator[T]) =>cleanF(iter))

12.        }

 

(4) 使用coalesce算子整理碎片文件。coalesce 默认情况不产生Shuffle,基本工作机制把更多并行度的数据变成更少的并行度。例如1万个并行度的数据变成100个并行度。coalesce算子返回一个新的RDD,汇聚为`numPartitions`个分区。这将导致一个窄依赖,例如,如果从1000个分区变成100分区,将不会产生shuffle,而不是从当前分区的10个分区变成100新分区。然而,如果做一个激烈的合并,例如numpartitions= 1,这可能会导致计算发生在更少的节点。(例如设置numpartitions = 1,将计算在一个节点上)。为了避免这个情况,可以设置shuffle = true。这将增加一个shuffle 的步骤,但意味着当前的上游分区将并行执行。shuffle = true,可以汇聚到一个更大的分区,对于少量的分区这是有用的,

例如说100个分区,可能有几个分区数据非常大。那使用coalesce算子合并(1000,shuffle = true),将导致使用哈希分区器将数据分布在1000个分区。注意可选的分区coalescer必须是可序列化的。

RDD.scala的coalesce算子代码如下:

1.           def coalesce(numPartitions: Int, shuffle:Boolean = false,

2.                        partitionCoalescer:Option[PartitionCoalescer] = Option.empty)

3.                       (implicit ord: Ordering[T] =null)

4.               : RDD[T] = withScope {

5.             require(numPartitions > 0, s"Numberof partitions ($numPartitions) must be positive.")

6.             if (shuffle) {

7.               /** Distributes elements evenly acrossoutput partitions, starting from a random partition. */

8.               val distributePartition = (index: Int,items: Iterator[T]) => {

9.                 var position = (newRandom(index)).nextInt(numPartitions)

10.              items.map { t =>

11.                // Note that the hash code of the keywill just be the key itself. The HashPartitioner

12.                // will mod it with the number oftotal partitions.

13.                position = position + 1

14.                (position, t)

15.              }

16.            } : Iterator[(Int, T)]

17.       

18.            // include a shuffle step so that ourupstream tasks are still distributed

19.            new CoalescedRDD(

20.              new ShuffledRDD[Int, T,T](mapPartitionsWithIndex(distributePartition),

21.              new HashPartitioner(numPartitions)),

22.              numPartitions,

23.              partitionCoalescer).values

24.          } else {

25.            new CoalescedRDD(this, numPartitions,partitionCoalescer)

26.          }

27.        }

从最优化的角度讲,使用coalesce一般在使用filter算子之后。因为filter算子会产生数据碎片,Spark的并行度会从上游传到下游,我们在filter算子之后一般会使用coalesce算子。

 

(5) 使用repartition算子,其背后使用的仍是coalesce。但是shuffle 值默认设置为true,repartition算子会产生Shuffle。Repartition代码如下:

1.              def repartition(numPartitions: Int)(implicitord: Ordering[T] = null): RDD[T] = withScope {

2.             coalesce(numPartitions, shuffle = true)

3.           }

 

(6)repartition算子碎片整理以后会进行排序,Spark官方提供了一个repartitionAndSortWithinPartitions算子。JavaPairRDD的repartitionAndSortWithinPartitions方法代码如下:

1.           def  repartitionAndSortWithinPartitions(partitioner:Partitioner): JavaPairRDD[K, V] = {

2.             val comp =com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]

3.             repartitionAndSortWithinPartitions(partitioner,comp)

4.           }

 

(7)persist:数据复用的时候使用持久化算子。

1.          def persist(newLevel: StorageLevel): this.type= {

2.             if (isLocallyCheckpointed) {

3.               // This means the user previously calledlocalCheckpoint(), which should have already

4.               // marked this RDD for persisting. Herewe should override the old storage level with

5.               // one that is explicitly requested bythe user (after adapting it to use disk).

6.               persist(LocalRDDCheckpointData.transformStorageLevel(newLevel),allowOverride = true)

7.             } else {

8.               persist(newLevel, allowOverride = false)

9.             }

10.        }

 

 

(8)mapPartitionsWithIndex算子:推荐使用,每个分区有个index,实际运行的时候看RDD上面有数字,如果对数字感兴趣可以使用mapPartitionsWithIndex算子。

mapPartitionsWithIndex算子代码如下:

1.           def mapPartitionsWithIndex[U: ClassTag](

2.               f: (Int, Iterator[T]) => Iterator[U],

3.               preservesPartitioning: Boolean = false):RDD[U] = withScope {

4.             val cleanedF = sc.clean(f)

5.             new MapPartitionsRDD(

6.               this,

7.               (context: TaskContext, index: Int, iter:Iterator[T]) => cleanedF(index, iter),

8.               preservesPartitioning)

9.           }