Spark RDD概念学习系列之RDD的转换(十)

时间:2023-02-25 10:38:03

  RDD的转换

  Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。

Spark Scala版本的Word Count程序如下:

1: val file = spark.textFile("hdfs://...")
2: val counts = file.flatMap(line => line.split(" "))
3:  .map(word => (word, 1))
4:  .reduceByKey(_ + _)
5: counts.saveAsTextFile("hdfs://...")

file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。那么上面这5行代码的具体实现是什么呢?

1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

  spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。

  也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD

3)行3:将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word, 1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

5)行5:首先会生成一个MapPartitionsRDD,这个RDD会通过调用org.apache.spark.rdd.PairRDDFunctions#saveAsHadoopDataset向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。

RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。

RDD的依赖关系(宽依赖和窄依赖)

如,假设,现在如下

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

所以,

Spark RDD概念学习系列之RDD的转换(十)

比如,我这里是刚好是4台worker1、worker2、worker3、worker4。还有1台Master。

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

soga,

val file = spark.textFile("hdfs://...")

1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

  spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。

  也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

想要成为高手,一定要多看源码,看上几十遍都太少了,包括看上10个版本的源码。无论是hadoop、还是spark

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

val counts = file.flatMap(line => line.split(" ")) 2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD

Spark RDD概念学习系列之RDD的转换(十)

.map(word => (word, 1))    3)行3:将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word, 1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

至此,windows本地,已经完成了。

下面是在网络里了。

注意啦! 分区是计算概念,分片是数据概念。

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

Spark RDD概念学习系列之RDD的转换(十)

有4台worker,每台都在自己内存计算。

Spark RDD概念学习系列之RDD的转换(十)

.reduceByKey(_ + _)

4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

总结:

Spark RDD概念学习系列之RDD的转换(十)

第一个stage :

HadoopRDD  ->   MapPartitionRDD  ->   MapPartitionsRDD  ->  MapPartitionsRDD  ->  MapPartitionsRDD

Spark RDD概念学习系列之RDD的转换(十)

第二个stage :  

  Stage shuffledRDD   ->  MapPartitionsRDD

Spark RDD概念学习系列之RDD的转换(十)的更多相关文章

  1. Spark RDD概念学习系列之RDD的checkpoint(九)

     RDD的检查点 首先,要清楚.为什么spark要引入检查点机制?引入RDD的检查点?  答:如果缓存丢失了,则需要重新计算.如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容 ...

  2. Spark RDD概念学习系列之RDD的缓存(八)

      RDD的缓存 RDD的缓存和RDD的checkpoint的区别 缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存.本地文件系统和Tachyon) ...

  3. Spark RDD概念学习系列之RDD的操作(七)

    RDD的操作 RDD支持两种操作:转换和动作. 1)转换,即从现有的数据集创建一个新的数据集. 2)动作,即在数据集上进行计算后,返回一个值给Driver程序. 例如,map就是一种转换,它将数据集每 ...

  4. Spark RDD概念学习系列之RDD是什么?(四)

       RDD是什么? 通俗地理解,RDD可以被抽象地理解为一个大的数组(Array),但是这个数组是分布在集群上的.详细见  Spark的数据存储 Spark的核心数据模型是RDD,但RDD是个抽象类 ...

  5. Spark RDD概念学习系列之RDD的依赖关系(宽依赖和窄依赖)(三)

    RDD的依赖关系?   RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency). 1)窄依赖指的是每 ...

  6. Spark RDD概念学习系列之RDD的缺点(二)

        RDD的缺点? RDD是Spark最基本也是最根本的数据抽象,它具备像MapReduce等数据流模型的容错性,并且允许开发人员在大型集群上执行基于内存的计算. 为了有效地实现容错,(详细见ht ...

  7. Spark RDD概念学习系列之rdd的依赖关系彻底解密(十九)

    本期内容: 1.RDD依赖关系的本质内幕 2.依赖关系下的数据流视图 3.经典的RDD依赖关系解析 4.RDD依赖关系源码内幕 1.RDD依赖关系的本质内幕 由于RDD是粗粒度的操作数据集,每个Tra ...

  8. Spark RDD概念学习系列之RDD的创建(六)

    RDD的创建  两种方式来创建RDD: 1)由一个已经存在的Scala集合创建 2)由外部存储系统的数据集创建,包括本地文件系统,还有所有Hadoop支持的数据集,比如HDFS.Cassandra.H ...

  9. Spark RDD概念学习系列之RDD的5大特点(五)

      RDD的5大特点  1)有一个分片列表,就是能被切分,和Hadoop一样,能够切分的数据才能并行计算. 一组分片(partition),即数据集的基本组成单位,对于RDD来说,每个分片都会被一个计 ...

随机推荐

  1. 在winform中添加普通右键菜单

    显示水平滚动条:点击GridControl的Run Designer在弹出的对话框中选择Views,将右侧属性窗口中OptionsView下的ColumnAutoWidth设置成false: 可以选择 ...

  2. iOS-NSURLCache内存缓存

    在IOS应用程序开发中,为了减少与服务端的交互次数,加快用户的响应速度,一般都会在IOS设备中加一个缓存的机制.使用缓存的目的是为了使用的应用程序能更快速的响应用户输入,是程序高效的运行.有时候我们需 ...

  3. HDOJ/HDU 1039 Easier Done Than Said?(字符串处理~)

    Problem Description Password security is a tricky thing. Users prefer simple passwords that are easy ...

  4. LeeCode-Delete Node in a Linked List

    Write a function to delete a node (except the tail) in a singly linked list, given only access to th ...

  5. npm常用指令

    安装: npm install <name> npm install <name> 安装依赖包,默认安装最新版本,也可在后面加上版本号,并且将安装信息加入项目的package. ...

  6. 启动Mysql数据库报错误:-bash&colon; &period;&sol;start&period;sh&colon; Permission denied

    linux下安装好Mysql数据库后,输入启动命令: cd /home/homework/mysql && ./start.sh 回车后报如下错误: 原因是:该文件未有相关执行权限 解 ...

  7. Filebeat&plus;Kafka&plus;Logstash&plus;ElasticSearch&plus;Kibana 日志采集方案

    前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产 ...

  8. C&plus;&plus;与C的区别

    在最开始C++只是C加上了一些面向对象的特性,C++最初的名字为C with Classes.后来C++又提出了一些不同于Class的特性:Exceptions(异常).templates(模板).S ...

  9. 20155339平措卓玛 Exp2 后门原理与实践

    20155339平措卓玛Exp2 后门原理与实践 基础问题 (1)例举你能想到的一个后门进入到你系统中的可能方式? 答:下载并安装某个程序,这个程序可以正常的并且完整的为我们提供服务,但是在开发改程序 ...

  10. proto3 笔记1

    定义 Message 类型, 例子如下: syntax = "proto3"; message SearchRequest { string query = 1; int32 pa ...