1.基本转化操作
1.1最常用的两个转化操作时map()和filter()。
map()接收一个函数,把这个函数用于RDD中的每个元素,将函数作用之后的结果作为结果RDD中元素的值。
filter()接收一个函数,将RDD中满足该函数的元素返回放入新的RDD中。
举一个使用map()求RDD平方的例子。scala代码如下:
def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setAppName("map"); conf.setMaster("local"); val sc = new SparkContext(conf); val num = sc.parallelize(List(, , , )); val result = num.map(x => x*x); result.take().foreach(println); }
对应的Java代码为:
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("trans"); conf.setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(, , , )); JavaRDD<Integer> outputRdd = rdd.map(new Function<Integer,Integer >() { private static final long serialVersionUID = 1L; public Integer call(Integer x) throws Exception { return x*x; } }); System.out.println(StringUtils.join(",", outputRdd.collect())); sc.close(); }
1.2 flatMap()
有时候我们希望操作一个元素使他返回多个元素,这时我们可以使用flatMap()。下面举一个例子,将一个包含一个字符串的RDD通过空格切分,返回多个元素。
scala代码如下:
def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setAppName("flatmap"); conf.setMaster("local"); val sc = new SparkContext(conf); val lines = sc.parallelize(List("hello world","hi")); val words = lines.flatMap(line => line.split(" ")) println(words.first()) }
1.3集合操作
1.union(),返回一个包含两个RDD所以元素的RDD。例:
def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setAppName("union"); conf.setMaster("local"); val sc = new SparkContext(conf); val a = sc.parallelize(List(, , )); val b = sc.parallelize(List(, , )); val c = a.union(b); println(c.collect().mkString(",")); }
Java代码如下:
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("trans"); conf.setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> a = sc.parallelize(Arrays.asList(1, 2, 3)); JavaRDD<Integer> b = sc.parallelize(Arrays.asList(3, 4, 5)); JavaRDD<Integer> c = a.union(b); System.out.println(StringUtils.join(",", c.collect())); sc.close(); }
如果想要去除重复的元素,可以使用distinct()方法,不过该方法开销很大,因为它需要将所有数据通过网络进行混洗shuffle。
使用intersection(other)方法可以返回两个RDD中都有的元素,它会去除RDD中重复的元素。同样它也需要进行混洗,开销大,效率低。
使用Cartesian(other)计算两个RDD的笛卡尔积。
使用subtract(other),从一个RDD中移除在另一个RDD含有的元素 。
1.4行动操作
1.reduce():reduce函数接收一个函数,这个函数操作两个RDD并返回一个同样类型的新元素。举个简单的+的例子。
object ReduceRdd { def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setAppName("reduce"); conf.setMaster("local"); val sc = new SparkContext(conf); val rdd = sc.parallelize(List(1, 2, 3, 4)); val sum = rdd.reduce((x,y) =>x+y); println(sum); } }
2.aggregate():我们可以使用这个函数返回不同类型的新元素。使用这个函数时,首先要提供我们期待返回类型的初始值,然后使用一个函数将RDD的元素进行累加,最后使用一个函数将不同节点上的RDD进行合并。举一个求平均数的例子:
object BasicAvg { def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setAppName("reduce"); conf.setMaster("local"); val sc = new SparkContext(conf); val num = sc.parallelize(List(1,2,3,4)); val result = num.aggregate((0,0))((x,y) => (x._1+y,x._2+1), (x,y)=>(x._1+y._1,x._2+y._2)); val avg = result._1/result._2.toDouble; println(avg); } }
解释一下aggregate()的过程,首先给定我们期待的结果的RDD的初始值,为 (0,0),第一个值表示的是RDD各个元素的值,第二个值表示的是元素的个数。第一个函数进行的是累加操作,比如第一个RDD累加后的值为(1,1),第二个RDD累加后的值为(3,2),以此类推。第二个函数主要是为了将不同节点上的RDD进行合并。
3.还有一些简单的将数据返回给驱动程序的操作,比如
collect(),它会将整个RDD的内容返回。
take(n)返回RDD的n个元素。
foreach()行动操作对RDD中的每一个元素进行操作。
count()计算RDD中元素的个数。
countByValue()计算各个元素在RDD中出现的次数。