缓存机制的产生
当 Spakr 在执行过程中,其中有某个机器宕机,造成数据丢失,在只有血缘机制的情况下,它会去从产生 RDD 再开始计算,因为没有数据的缓存
。所以当血缘关系特别长的时候,就需要加入缓存机制.
缓存机制
RDD通过persist方法
或cache方法
可以将前面的计算结果缓存
,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。
但是并不是这两个方法被调用时立即缓存,而是触发后面
的action
时,该RDD
将会被缓存在计算节点的内存
中,并供后面重用
。
persist() & cache()
persist() 源码
这个方法可以传入参数,定义缓存保存的级别
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
cache() 源码
可以发现 cache()
底层调用了 persist()
方法。
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()
缓存级别
org/apache/spark/storage/StorageLevel.scala
里面
数字代表缓存份数,可缓存到内存、磁盘。
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
缓存机制的体现
不做缓存,多次打印,发现结果都不一样
遇到行动算子才会计算
val rdd = sc.makeRDD(Array("bigdataboy")) val nocache = rdd.map(_ + System.currentTimeMillis) println(nocache.collect().mkString(",")) println(nocache.collect().mkString(",")) println(nocache.collect().mkString(",")) ---------------------------------------- bigdataboy1582002800524 bigdataboy1582002800634 bigdataboy1582002800689
加上缓存,发现是一样的
val rdd = sc.makeRDD(Array("bigdataboy")) // 加上这个RRD的缓存 val cache = rdd.map(_ + System.currentTimeMillis).cache println(cache.collect().mkString(",")) println(cache.collect().mkString(",")) println(cache.collect().mkString(",")) println(cache.toDebugString) ---------------------------- bigdataboy1582003008270 bigdataboy1582003008270 bigdataboy1582003008270 (16) MapPartitionsRDD[1] at map at RDD缓存.scala:18 [Memory Deserialized 1x Replicated] //这就是血缘里的缓存 | CachedPartitions: 16; MemorySize: 352.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B | ParallelCollectionRDD[0] at makeRDD at RDD缓存.scala:17 [Memory Deserialized 1x Replicated]
版权声明:《 Spark RDD 的缓存机制 》为明妃原创文章,转载请注明出处!
最后编辑:2020-2-18 14:02:07