Spark缓存

时间:2025-05-14 13:06:03
persist和cache方法

在 Spark 中,persist 和 cache 方法都用于将 RDD(弹性分布式数据集)或 DataFrame 持久化,以避免重复计算从而提升性能,但二者存在一些区别。

1. 功能本质

persist:这是一个通用的持久化方法,能够指定多种不同的存储级别。存储级别决定了数据的存储位置(如内存、磁盘)以及存储形式(如是否序列化)。

cache:其实是 persist 方法的一种特殊情况,它等价于调用 persist(StorageLevel.MEMORY_ONLY),也就是将数据以非序列化的 Java 对象形式存储在内存中。

2. 存储级别指定

persist:可以通过传入 StorageLevel 参数来指定不同的持久化级别。常见的持久化级别有:

MEMORY_ONLY:将 RDD 以 Java 对象的形式存储在 JVM 的内存中。若内存不足,部分分区将不会被缓存,需要时会重新计算。

MEMORY_AND_DISK:优先把 RDD 以 Java 对象的形式存储在 JVM 的内存中。若内存不足,会把多余的分区存储到磁盘上。

DISK_ONLY:将 RDD 的数据存储在磁盘上。

MEMORY_ONLY_SER:将 RDD 以序列化的 Java 对象形式存储在内存中,相较于 MEMORY_ONLY,序列化后占用的内存空间更小,但读取时需要进行反序列化操作,会带来一定的性能开销。

MEMORY_AND_DISK_SER:优先将 RDD 以序列化的 Java 对象形式存储在内存中,内存不足时存储到磁盘上。

cache:不能指定存储级别,它固定使用 MEMORY_ONLY 存储级别。

下面我们以DISK_ONLY为例,改写上面的程序,验证它的持久化效果。具体要改动的地方有两个: 指定持久化地址; 把cache改成persist;

import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {
    //打印hello word!
    println("Hello World!")
    //学习sparkRDD中的转换算子
    //1.map
    //2.filter:过滤
    //3.flatMap:flat(扁平化)+map(映射)
    //4.reduceByKey:键值对的数据(world,1),(hello,1)


    val conf = new SparkConf().setMaster("local[*]").setAppName("Test")
    val sc = new SparkContext(conf)
    //创建一个RDD
//    val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

//    val rdd1 = rdd.map(x => x * 2)

    //使用filter算子,保留偶数
//    val rdd1 = rdd.filter(x => x % 2 == 0)

    //有多个句子,每个句子有多个单词,单词之间使用空格隔开
    //目标,把所有单词找出来,放一个数组中
//    val rdd = sc.parallelize(List("hello world", "hello scala"))
//    val rdd1 = rdd.flatMap(x => x.split(" "))

    //hello world hello scala

    //词频统计的例子
    val rdd = sc.parallelize(List("apple","banana","apple","banana","apple"))
//    val rdd1 = rdd.map(x => (x,1))//(apple,1)(banana,1)(apple,1)(banana,1)(apple,1)
//    val rdd3 = rdd1.reduceByKey((x,y) => x+y)//(apple,3)(banana,2)
    rdd.map(x => (x,1)).reduceByKey((x,y) => x+y).collect().foreach(println)

    //collect()行动算子
//    rdd3.collect().foreach(println)
  }
}