RDD 持久化
文章目錄
- RDD 持久化
- 1、RDD Cache 緩存
- 2、持久化的作用
- 3、RDD ChechPoint 檢查點
- 1) 說明
- 2) 代碼示例
- 4、Cache & Persist & Checkpoint 區(qū)別
RDD 持久化
1、RDD Cache 緩存
RDD 通過 Cache 或者 Persist 方法將前面的計算結(jié)果緩存,默認情況下會把數(shù)據(jù)以緩存在JVM的堆內(nèi)存中。但是并不是這兩個方法被調(diào)用時立即緩存,而是觸發(fā)后面的 action 算子時,該RDD將會被緩存計算節(jié)點的內(nèi)存中,并供后面重新用。
RDD不存儲數(shù),如果一個RDD要重復(fù)使用,那么需要從頭再次執(zhí)行來獲取數(shù)據(jù),RDD對象是可以重用的,但是數(shù)據(jù)無法重用。所以要是想重用那么可以使用,cache() 方法是緩存在內(nèi)存中,persist()方法可以選擇是保存在文件磁盤或者內(nèi)存中。
persist()方法的存儲的級別:
| MEMORY_ONLY | 高 | 低 | 是 | 否 | |
| MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | |
| MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤上 |
| MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤上。在內(nèi)存中存放序列化后的數(shù)據(jù) |
| DISK_ONLY | 低 | 高 | 否 | 是 | |
| 緩存有可能丟失,或者存儲于內(nèi)存的數(shù)據(jù)由于內(nèi)存不是而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執(zhí)行。通過基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會被重算,由于RDD的各個 Partition 是相對獨立的,因此只需要計算丟失的部分即可,并不需要重算全部 Partition。 |
注意:持久化操作必須是在行動算子執(zhí)行時完成的,因為只有行動算子在執(zhí)行時,才會有數(shù)據(jù),不然數(shù)據(jù)都沒有,要這持久化干什么。
package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext}//這個是說的持久化,由于內(nèi)容比較少,難得鍵一個新的包了 class Spark02_RDD_chijiu {} object Spark02_RDD_chijiu{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//持久化操作必須是在行動算子執(zhí)行時完成的//mapRDD.cache() // cache()進行緩存,后面RDD執(zhí)行就不會重頭開始了mapRDD.persist(StorageLevel.DISK_ONLY) // cache() 是放在內(nèi)存中,persist是放在文件中 這兩個方法完全一樣val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()} }2、持久化的作用
RDD對象的持久化操作不一定是為了重用,在數(shù)據(jù)執(zhí)行較長,或數(shù)據(jù)比較重要的場和也可以采用持久化操作。
3、RDD ChechPoint 檢查點
所謂的檢查點其實就是通過將RDD中間結(jié)果寫入磁盤,由于血緣依賴過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果檢查點之后有節(jié)點出現(xiàn)問題,可以從檢查點開始重做血緣,減少了開銷。對RDD進行 chechpoint 操作并不會馬上被執(zhí)行,必須執(zhí)行 Action 操作才能觸發(fā)。
1) 說明
checkpoint 需要落盤,需要指定檢查點保存路徑,檢查點路徑保存的文件,當(dāng)作業(yè)執(zhí)行完畢后,不會被刪除,一般保存路徑都是在分布式存儲系統(tǒng):比如說hdfs。checkpoint 有個路徑參數(shù),需要有一個保存檢查點的文件路徑,SparkContext.setCheckpointDir()使用這個方法來保存路徑,里面的參數(shù)是路徑。
它的使用方法和上面的那個持久化的位置是一樣的,他們的功能也是比較像,都是將數(shù)據(jù)保存下來,然后要重新執(zhí)行RDD,可以直接從這里開始。
2) 代碼示例
package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.StorageLevel// 持久化的檢查點 class Spark02_RDD_chijiu2 {} object Spark02_RDD_chijiu2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)context.setCheckpointDir("cp") // setCheckpointDir()這個是下面的檢查點的保存路徑val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//checkpoint 需要落盤,需要指定檢查點保存路徑//檢查點路徑保存的文件,當(dāng)作業(yè)執(zhí)行完畢后,不會被刪除//一般保存路徑都是在分布式存儲系統(tǒng):比如說hdfsmapRDD.checkpoint() //檢查點val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()} }4、Cache & Persist & Checkpoint 區(qū)別
cache:將數(shù)據(jù)臨時存儲在內(nèi)存中進行數(shù)據(jù)重用。會在血緣關(guān)系中添加新的依賴。一旦出現(xiàn)問題,可以從頭讀取數(shù)據(jù)。
persist:將數(shù)據(jù)臨時存儲在磁盤文件中,進行數(shù)據(jù)重用,涉及到磁盤IO,性能較低,但是數(shù)據(jù)安全,如果作業(yè)執(zhí)行完畢,臨時保存的數(shù)據(jù)文件就會丟失。
checkpoint:將數(shù)據(jù)長久的保存在磁盤文件中進行數(shù)據(jù)重用,涉及到磁盤IO,性能較低,但是數(shù)據(jù)安全,為了保證數(shù)據(jù)安全,所以一般情況下,會獨立執(zhí)行作業(yè),為了能夠提高效率,一般情況下,是需要和cache聯(lián)合使用,先cache 然后再行執(zhí)行 checkpoint 操作。執(zhí)行過程中,會切斷血緣關(guān)系,重新建立新的血緣關(guān)系。
注意:checkpoint 等同于改變數(shù)據(jù)源,相當(dāng)于檢查點作為了新的數(shù)據(jù)源。所以直接切斷了之前的血緣關(guān)系,重新開了一脈,自己這里開始。
總結(jié)
- 上一篇: F. Kirei and the Lin
- 下一篇: 使用 FreeBSD 作为桌面