U: ](constructA: Int = A

时间:2022-06-27 08:02:33

RDD(Resilient Distributed Dataset弹性漫衍式数据集)是Spark中抽象的数据布局类型,,任何数据在Spark中都被暗示为RDD。从编程的角度来看,RDD可以简单当作是一个数组。和普通数组的区别是,RDD中的数据时分区存储的,这样差别分区的数据就可以漫衍在差此外机器上,同时可以被并行措置惩罚惩罚。因此,Spark应用措施所做的无非是把需要措置惩罚惩罚的数据转换为RDD,然后对RDD进行一系列的调动和操纵,从而得到功效。

2、RDD创建

RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。

1) 从普通数组创建RDD,里面包罗了1到9这9个数字,它们分袂在3个分区中

scala> val a = sc.parallelize(1 to 9, 3)

2)读取文件README.md来创建RDD,文件中的每一行就是RDD中的一个元素

scala> val b = sc.textFile("README.md")

3、两类操纵算子

主要分两类,转换(transformation)和行动(action)。两类函数的主要区别是,transformation接受RDD并返回RDD,而action接受RDD返回非RDD.

transformation操纵是延迟计算的,也就是说从一个RDD生成另一个RDD的转换操纵不是顿时执行,需要等到有action操纵的时候才真正触发运算。

action算子会触发spark提交功课job,并将数据输出spark系统。

4、转换算子

更多可以参看 

1)map

对RDD中的每个元素都执行一个指定的函数来孕育产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

举例:

scala> val a = sc.parallelize(1 to 9, 3)

scala> val b = a.map(x => x*2)

scala> a.collect

res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> b.collect

res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

比拟分析下,如果换成flatMap,功效如下:

U: ](constructA: Int = A

2)flatMap

与map类似,区别是原RDD中的元素经map措置惩罚惩罚后只能生成一个元素,而原RDD中的元素经flatmap措置惩罚惩罚后可生成多个元素来构建新RDD。

举例:对原RDD中的每个元素x孕育产生y个元素(从1到y,y为元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)

scala> val b = a.flatMap(x => 1 to x)

scala> b.collect

res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4,1,2,3,4)

比拟分析下,如果换成map,功效如下:

U: ](constructA: Int = A

3)mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来措置惩罚惩罚的。

它的函数界说为:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它措置惩罚惩罚每个分区里面的内容。每个分区中的内容将以Iterator[T]通报给输入函数f,f的输出功效是Iterator[U]。最终的RDD由所有分区颠末输入函数措置惩罚惩罚后的功效合并起来的。

举例:

U: ](constructA: Int = A

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素构成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在功效中。

mapPartitions还有些变种,好比mapPartitionsWithContext,它能把措置惩罚惩罚过程中的一些状态信息通报给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index通报给用户指定的输入函数。

4)mapWith

是map的此外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的界说如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;

第二个函数f是把二元组(T, A)作为输入(此中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把partition index 乘以10,然后加上2作为新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

x.mapWith(a => a * 10)((a, b) => (b + 2)).collect

res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

5)flatMapWith

flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;此外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素构成了新的RDD。它的界说如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

举例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)

scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect

res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,8, 2, 9)

6)flatMapValues