Spark RDD 的缓存机制

缓存机制的产生

当 Spakr 在执行过程中,其中有某个机器宕机,造成数据丢失,在只有血缘机制的情况下,它会去从产生 RDD 再开始计算,因为没有数据的缓存所以当血缘关系特别长的时候,就需要加入缓存机制.

缓存机制

RDD通过persist方法cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。
但是并不是这两个方法被调用时立即缓存,而是触发后面action时,该RDD将会被缓存在计算节点的内存中,并供后面重用

persist() & cache()

persist() 源码

这个方法可以传入参数,定义缓存保存的级别

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
 def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

cache() 源码

可以发现 cache() 底层调用了 persist() 方法。

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
 def cache(): this.type = persist()

缓存级别

org/apache/spark/storage/StorageLevel.scala里面

数字代表缓存份数,可缓存到内存、磁盘。

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

缓存机制的体现

不做缓存,多次打印,发现结果都不一样

遇到行动算子才会计算

val rdd = sc.makeRDD(Array("bigdataboy"))
val nocache = rdd.map(_ + System.currentTimeMillis)
println(nocache.collect().mkString(","))
println(nocache.collect().mkString(","))
println(nocache.collect().mkString(","))
----------------------------------------
bigdataboy1582002800524
bigdataboy1582002800634
bigdataboy1582002800689

加上缓存,发现是一样的

val rdd = sc.makeRDD(Array("bigdataboy"))
// 加上这个RRD的缓存
val cache = rdd.map(_ + System.currentTimeMillis).cache  
println(cache.collect().mkString(","))
println(cache.collect().mkString(","))
println(cache.collect().mkString(","))
println(cache.toDebugString)
----------------------------
bigdataboy1582003008270
bigdataboy1582003008270
bigdataboy1582003008270
(16) MapPartitionsRDD[1] at map at RDD缓存.scala:18 [Memory Deserialized 1x Replicated]
          //这就是血缘里的缓存
 |        CachedPartitions: 16; MemorySize: 352.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |   ParallelCollectionRDD[0] at makeRDD at RDD缓存.scala:17 [Memory Deserialized 1x Replicated]
发表评论 / Comment

用心评论~