spark数据分析引擎

时间:2024-04-14 19:21:17

简介

  • spark是专为大规模数据处理而设计的快速通用的计算引擎 .
  • spark既可以批处理也可以做流式处理
  • spark运行速度比mapreduce快大约10倍 . 在机器学习 ,人工智能的逻辑回归迭代算法场景下spark运行速度比mapreduce大约快100多倍 .
  • mapreduce在计算过程中涉及到本地磁盘的读写 , spark的数据流转都是在内存中完成的 . spark涉及到DAG(有向无环图)来切分任务的执行的先后顺序的思想 , 也是快的原因之一 .
  • spark可以使用java,scala,python,R语言进行开发 , 使用scala来开发 , 非常的简单轻便 .
  • spark上可以直接写SQL
  • spark实现好了很多机器学习库 , 逻辑回归 ,线性回归 ,决策树 …
  • spark似乎想实现一站式开发 , 来让用户不必再使用mr , storm , hive …
  • hadoop在国内是13年左右开始火的 , spark则是15年左右 .
技术栈讲解

spark数据分析引擎
- hdfs分布式文件系统 , 存储数据 .
- Mesos资源管理调度(没有hdfs出名)
- Tachyon基于内存的文件系统(相比于hdfs烧钱)
- SparkStrem流式处理
- MLbase机器学习的库,GrophX图计算,sparkSQL解析SQL ,都是基于spark核心底层RDD
- hive数据仓库 , 解析SQL , hive1.x底层支持MR , hive2.X可以底层spark.

spark的运行模式

  • local , 多用于本地测试 , 例如在eclipse , idea 中写程序测试等.
  • Standalone 是spark自带的一个资源调度框架 , 它支持完全分布式.
  • Yarn , Hadoop 生态圈里面的一个资源调度框架,Spark 也是可以基于 Yarn 来计算的。
  • Mesos 资源调度框架。

RDD

  • RDD(Resilient Distributed Dateset),弹性分布式数据集(你可以理解成集合)。
  • 五个特性:
    • 1 . RDD 是由一系列的 partition 组成的。
    • 2 . 函数是作用在每一个 partition(split)上的。
    • 3 . RDD 之间有一系列的依赖关系。
    • 4 . 分区器是作用在 Key,Value 格式的 RDD 上。
    • 5 . RDD 提供一系列最佳的计算位置 , 计算移动数据不移动。

spark数据分析引擎
``

  • textFile 方法底层封装的是读取 MR 读取文件的方式,读取文件之前先 split,默认 split 大小是一个 block 大小, RDD = textFile(path)。 窄依赖
  • parallelize , 创建一个RDD , RDD = parallelize(Array(1,2,3)) . 窄依赖
  • RDD 实际上不存储数据,这里方便理解,暂时理解为存储数据。(计算移动 , 数据不移动)
  • 什么是 K,V 格式的 RDD?
    • 如果 RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们就叫做 K,V 格式的 RDD。
  • 哪里体现 RDD 的弹性(容错)?
    • 弹性 : partition 数量,大小没有限制,体现了 RDD 的弹性。
    • 容错 : RDD 之间依赖关系,可以基于上一个 RDD 重新计算出 RDD.
  • 哪里体现 RDD 的分布式?
    • RDD 是由 Partition 组成,partition 是分布在不同节点上的。
    • RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。

Spark任务执行原理

  • Driver 和 多台Work节点,Master节点组成了主从架构 ,他们都是运行在JVM中的进程 . Driver负责任务分发到Work节点 , 计算结果回收 , 监控任务 ,在任务分发和结果回收的时候 , 如果task的计算结果非常大可以不要回收 . Work节点是Standalone资源调度框架里面资源管理的从节点 , 它会会启动excuter进程执行任务. Master是Standalone资源调度框架里面资源管理的主节点 .

算子

Transformation 转换算子(懒执行 , 任务过程中一定要有行动(触发)算子触发才会执行的算子)

filter : 过滤 ,true 保留,false 过滤掉。 窄依赖 .

map:按照参数函数规则, 将数据组成map/二元组 , 组成新的rdd . 窄依赖: data-> <date , 1>

flatMap:将数据按照传入的函数规则 , 比如说传入"," , 按逗号切分RDD数据组成新的RDD , 窄依赖

sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。 RDD.sample(true , 0.1) //数据抽样 , 抽完后把数据任然放回RDD参与下一轮抽样 , 抽样结果大约是百分之十 , 不精确 .

reduceByKey:相同的key分组 ,value统计聚合 . 宽依赖 : <key,1>,<key,1> -> <key,2>
sortByKey/sortBy: 作用在 K,V 格式的 RDD 上,对 key 进行升序或者降序排序。
groupByKey:按key排序 ,value聚合成一个iterator迭代器 <key , 1><key , 2> =>><key , <1 , 2>> 宽依赖
union:合并 , 窄依赖 .
cogroup:
crossProduct:
mapValues:
sort:排序
partitionBy:分区
join:2和rdd合并一个 , 共同部分保留 , 不同部分去掉 (k ,v1),(k,v2)=>(k,(v1,v2)) 宽依赖
leftOuterJoin: 以左边的RDD为基准 , 它没有的也包含, 但是值为空 .
leftOuterJoin: 右边为基准
fullOuterJoin: 全部保留
distinct: 去重 , key和value都一样的数据会被去除 .

Action 行动算子

count统计元素数。会在结果计算完成后回收到 Driver 端。
countByKey: 统计key的数量 , <key , 1> , <key , 5> ->> <key , 2>

take(n)返回一个包含数据集前 n 个元素的集合。

first, first=take(1),返回数据集中的第一个元素。

foreach循环遍历数据集中的每个元素,运行相应的逻辑。

collect将RDD的元素返回成一个array集合 .
reduce 数据累加
save数据保存

控制算子
  • 控制算子有三种 : cache , persist , checkpoint , 控制算子的目的是将RDD持久化 , 持久化的单位是partition . cache , persist是懒执行的 , 需要触发算子触发 . checkpoint不仅能将RDD持久化到磁盘 ,还可以切断RDD之间的依赖关系 .
  • cache , 执行一个job , RDD1 -> RDD2 -> RDD3 -> RDD4 , 第二个job , RDD1 -> RDD2 -> RDD4 -> RDD5 . 如果在第一个job执行到RDD2的过程中 , cache把RDD2持久化到内存中 , 执行第二个job的时候就可以直接从RDD2开始执行 .(RDD.cache())
  • persist : 可以指定持久化级别 , 最常用是MEMORY_ONLY(只保存内存)和MEMORY_AND_DISK(内存够放内存 , 内存不够放到磁盘) . DISK_ONLY(只保存磁盘) . (RDD.persist(StorageLevel.MEMORY_ONLY))
    spark数据分析引擎

参数说明:
_useDisk : 保存到内存还是磁盘
_useMemoru :是否要使用内存
_useOffHeap :是否使用堆外内存
_deserialized :是否要反序列化 , true:不序列化
_replication :副本数, 默认1

  • checkpoint , 以cache算子中的例子为例 , 执行RDD2.checkpoint()之后 , 第一个job会照常执行 , RDD2会在第一个job执行完成之后从RDD4 -> RDD3 -> RDD2(找到RDD2调用checkpoint算子的标识) -> RDD1(从头开始计算) -> RDD2(持久化数据) . 所以如果在第一个job执行过程中出现失误 , job再次启动任然是从头开始的 . checkpoint之前先cache的话 , 可以进行一定的优化 : RDD2会在第一个job执行完成之后从内存中找到RDD2, 然后直接持久化数据 , 就不用回溯了 .


宽窄依赖

我们将RDD之间的依赖关系分为以下两种:

  • 窄依赖
    父RDD与子RDD的patition之间的关系是一对一或者多对一 , 这种情况下是不会产生shuffle的 . 也就不会产生磁盘读写操作 . 快 .

  • 宽依赖
    父RDD和子RDD之间的关系是一对多 , 会有shuffle的产生 .会有磁盘操作 , 性能就没有那么快 . 所以纯粹的说spark完完全全是基于内存计算也不一定 .

  • Stage
    RDD之间的依赖关系 , 会形成一个DAG有向无环图 , DAG会提交给DAGScheduler , 它会把DAG划分成多个stage , 划分的标准就是宽窄依赖 , DAGScheduler遇到宽依赖 , 就将之前在所有的窄依赖划分成一个stage . stage是由一组并行的task组成 , 每个stage包含一个或多个task任务 , DAGScheduler会将这些task以taskSet的形式提交给TaskScheduler运行 . task的个数是由stage的finalRDD的partition决定的 . spark中数据的流转不同于mapreduce , 比如在一个stage中 , 有2个数据 , 先执行一次map算子 , 在执行一个filter算子 , 第一个数据会先map , 然后直接filter , 遇到宽依赖 , 等待第二个数据 . 第二个数据map , filter , 完成了 , 再一起继续执行 .
    spark数据分析引擎


spark的四种运行方式

  • Standalone-client和Yarn-client的运行方式见spark安装教程
  • Standalone-cliene , 在哪台机器上面提交任务 , 就在哪台机器上面启动driver , 如果提交100个任务 , 就会有100个driver , 会造成网卡流量激增 , 甚至会卡掉其他的进程 . 所以一般在测试环境使用 .
    命令示例:./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 1000
    spark数据分析引擎
    Standalone:Standalone启动时 , 客户端看不到执行情况 , driver进程随机分布 , 解决了client模式的流量激增问题 , 这种方式适用于生产环境 . 命令示例:./spark-submit --master spark://node1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100 , 多了deploy-mode cluster .
    spark数据分析引擎
    yarn-client: yarn-client这种方式适用于测试环境,同样造成流量激增问题.ApplicationMaster的功能:申请启动application的资源 . 连接nodeMaster,启动excutor进程 . 命令示例:./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
    spark数据分析引擎
    yarn-clustor: 适用于生产环境 , 没有流量激增问题. ApplicationMaster的功能: 申请application资源 ,启动excutor , 分发task任务 , 回收结果 . 监控任务执行情况 . 命令示例:./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
    spark数据分析引擎

spark资源调度和任务调度

spark数据分析引擎

  • 启动集群后,Worker 节点会向 Master 节点汇报资源情况 , Master 掌握了集群资源情况。
  • 当 Spark 提交一个 Application 后,根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向无环图。
  • 任务提交后,Spark 会在Driver 端创建两个对象:DAGScheduler 和TaskScheduler .
  • DAGScheduler是任务调度的高层调度器,是一个对象。DAGScheduler 的主要作用就是将DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage,然后将这些
    Stage 以 TaskSet 的形式提交给 TaskScheduler(TaskScheduler 是任务调
    度的低层调度器,这里 TaskSet 其实就是一个集合,里面封装的就是一个个的 task 任务,也就是 stage 中的并行度 task 任务).
  • TaskSchedule 会遍历TaskSet 集合,拿到每个 task 后会将 task 发送到计算节点 Executor 中去执行(其实就是发送到 Executor 中的线程池 ThreadPool 去执行)。task 在Executor 线程池中的运行情况会向 TaskScheduler 反馈,当 task 执行失败时,则由 TaskScheduler 负责重试,将 task 重新发送给 Executor 去执行,
    默认重试 3 次。如果重试 3 次依然失败,那么这个 task 所在的 stage 就失败了。
  • stage 失败了则由 DAGScheduler 来负责重试,重新发送 TaskSet 到
    TaskSchdeuler,Stage 默认重试 4 次。如果重试 4 次以后依然失败,那么这个 job 就失败了。job 失败了,Application 就失败了。
  • TaskScheduler 不仅能重试失败的 task,还会重试 straggling(落后,缓慢) task(也就是执行速度比其他 task 慢太多的 task)。如果有运行缓慢的 task 那么 TaskScheduler 会启动一个新的 task 来与这个运行缓慢的 task 执行相
    同的处理逻辑。两个 task 哪个先执行完,就以哪个 task 的执行结果为准。这就是 Spark 的推测执行机制。在 Spark 中推测执行默认是关闭的。推测执行可以通过 spark.speculation 属性来配置。 注意:
    • 对于 ETL 类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。
    • 如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有task 重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。

sparkSubmit常用的提交参数说明

  • master : API中的setMaster(“local”) ,代码API的优先级高于submit提交
  • deploy-mode : 运行方式 ,client / clustor
  • class : 整个应用程序的主入口
  • name : 应用程序的名字 API中的setAPPName(),代码API的优先级高于submit提交.
  • jar : java代码中可以不把外部引用的一些jar包一起打包 , jar命令会添加第三方的依赖 .
  • files : 一些代码需要的文件项目打包没有打的话可以用该命令应用 .
  • conf 参数名 = 参数值 : 例如 “–conf spark.executor.userClassPathFirst=true”;
  • driver-memory MEN : driver的进程执行内存 , 默认是1024M
  • executor-memory MEN : 每个executor进程多少内存 , 默认1G
  • driver-cores NUM: driver进程默认分配多少个核 , 默认1个
  • supervise : driver进程挂掉了会自动重启
  • kill SUBMISSION_ID : 杀掉driver进程 , 进而会导致executor也会挂
  • total-executor-cores NUM : 所有executors进程最多使用多少个核
  • executor-cores NUM :一个executors使用多少个核
    (yarn-only)
  • queue QUEUE_NAME : 资源分配 , yarn分很多个队列1,2,3… , 改命令的作用是指明使用哪个队列的资源 , 其他的队列就使用不了
  • num-executors NUM : 指定executor进程数量执行任务 , yarn默认是2个
  • principal PRINCIPAL 和 keytab KEYTAB : 权限 .

sparkShell的使用

sparkshell是spark自带的快速原型开发工具 ,可以做一些调试 , 在spark的bin目录下通过spark-shell命令启动 , 如果配置了如下配置 , 需要先启动hadoop
spark数据分析引擎

  • 默认初始化了SparkContext对象sc . 可以直接进行操作 .
    spark数据分析引擎
    sc.textFile("hdfs://Credi/math.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)通过命令处理hdfs上的数据 .

master HA高可用配置

  • 配置spark集群中备用master的spark-env.sh文件之后将spark-env.sh文件发送给其他work节点 , 备份master节点将master节点ip地址指向自己.
    spark数据分析引擎

广播变量和累加器

  • 广播变量的使用:
    • 使用SparkContext对象的broadcast方法可以将数据广播出去:val list = new ArrayList() ; Broadcast broadcast = new SparkContext(conf).broadcast(list) ;
    • broadcast.value() 可以获得广播的数据 .
  • 累加器的使用:
    • 声明累加器: Accumulator accumulator = javaSparkContext.accumulator(0);
    • 累加器的使用RDD.foreach(x -> accumulator.add(1)) ; System.out.println(accumulator.value())

spark shuffle

宽依赖的算子对RDD进行操作会产生shuffle , reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value , 生成一个新的RDD<key , <key,value>> .
在聚合之前 , 每一个key对应的value不一定都是在一个partition中 ,也不太可能在同一个节点上, 因为RDD是分布式的弹性的数据集 , RDD的partition极有可能分布在各个节点上 . 那么我们要如何进行聚合呢?

  • Shuffle Write : 上一个stage的每一个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中 ,可能会写入多个不同的分区文件中 .
  • Shuffle Read: reduce task会从上一个stage的所有task所在机器上寻找属于自己的那些分区文件 , 这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去进行处理和聚合 .
  • Spark中有两种Shuffle类型 , HashShuffle和SortShuffle , Spark1.2之前是HashShuffle默认的分区器是HashPartitioner , Spark1.2引入SortShuffle默认的分区器是RangePartitioner .
    • HashShuffle
      • 普通机制
        spark数据分析引擎
        每一个 map task 将不同结果写到不同的buffer中 , 每个buffer的大小是32kb . buffer的功能是数据缓存.


        map task的计算结果会根据分区器(默认hashPartitioner)来决定写到哪个磁盘小文件中 . reduceTask会去Map端拉取相应磁盘小文件.


        产生磁盘小文件的个数等于map task的个数 * reduce task的个数


        造成的问题: 容易产生很多磁盘小文件 , 会造成很多的内存使用量 , 进而导致GC频繁垃圾回收 , 如果内存还不够 , 就会导致OOM内存溢出 问题. 数据传输过程会有频繁的网络通信, 频繁的网络通信出现通信故障的可能性增加 , 一旦网络通信鼓掌就会导致DAGScheduler重试Stage .
      • 合并机制
        spark数据分析引擎产生磁盘小文件的个数等于核数*reduce task的数量.
    • SortShuffle
      • 普通机制
        spark数据分析引擎
        map task 的计算结果会写入到一个内存数据结构里面 , 内存数据结构默认是5M


        在shuffle的时候会有一个定时器, 不定期去估算内存结构的大小 , 当内存结构数据超过5M是 , 例如5.01M , 它会申请5.2M==(5.01 * 2 - 5)==内存给数据结构.


        如果申请成功不会进行溢写 , 不成功就会溢写到磁盘.


        在一些之前内存结构中的数据会进行排序


        一些到磁盘的数据以batch的形式去写 , 一个batch是1万条数据 .


        map task执行完成之后 , 会将磁盘小文件合并成打的磁盘文件 , 同时生成一个索引文件.


        reduce task去map端拉取数据时 , 先解析索引文件 , 根据索引文件再去拉取对应数据 .


        磁盘小文件个数 = 2* map task

      • bypass机制
        spark数据分析引擎bypasss运行机制的触发条件: shuffle , reduce , task 的数量小于spark.shuffle.sort.bypassMergeThresshold的参数 , 这个默认值是200 .
        产生磁盘小文件的数量: 2*map task ;

shffle文件寻址和内存管理

spark数据分析引擎