日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RDD 持久化

發(fā)布時間:2024/1/18 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RDD 持久化 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • RDD 持久化
    • 1、RDD Cache 緩存
    • 2、持久化的作用
    • 3、RDD ChechPoint 檢查點
      • 1) 說明
      • 2) 代碼示例
    • 4、Cache & Persist & Checkpoint 區(qū)別

RDD 持久化

1、RDD Cache 緩存

RDD 通過 Cache 或者 Persist 方法將前面的計算結(jié)果緩存,默認情況下會把數(shù)據(jù)以緩存在JVM的堆內(nèi)存中。但是并不是這兩個方法被調(diào)用時立即緩存,而是觸發(fā)后面的 action 算子時,該RDD將會被緩存計算節(jié)點的內(nèi)存中,并供后面重新用。
RDD不存儲數(shù),如果一個RDD要重復(fù)使用,那么需要從頭再次執(zhí)行來獲取數(shù)據(jù),RDD對象是可以重用的,但是數(shù)據(jù)無法重用。所以要是想重用那么可以使用,cache() 方法是緩存在內(nèi)存中,persist()方法可以選擇是保存在文件磁盤或者內(nèi)存中。
persist()方法的存儲的級別

級別使用的空間CPU時間是否在內(nèi)存中是否在磁盤上備注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK中等部分部分如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤上
MEMORY_AND_DISK_SER部分部分如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤上。在內(nèi)存中存放序列化后的數(shù)據(jù)
DISK_ONLY
緩存有可能丟失,或者存儲于內(nèi)存的數(shù)據(jù)由于內(nèi)存不是而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執(zhí)行。通過基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會被重算,由于RDD的各個 Partition 是相對獨立的,因此只需要計算丟失的部分即可,并不需要重算全部 Partition。

注意:持久化操作必須是在行動算子執(zhí)行時完成的,因為只有行動算子在執(zhí)行時,才會有數(shù)據(jù),不然數(shù)據(jù)都沒有,要這持久化干什么。

package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext}//這個是說的持久化,由于內(nèi)容比較少,難得鍵一個新的包了 class Spark02_RDD_chijiu {} object Spark02_RDD_chijiu{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//持久化操作必須是在行動算子執(zhí)行時完成的//mapRDD.cache() // cache()進行緩存,后面RDD執(zhí)行就不會重頭開始了mapRDD.persist(StorageLevel.DISK_ONLY) // cache() 是放在內(nèi)存中,persist是放在文件中 這兩個方法完全一樣val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()} }

2、持久化的作用

RDD對象的持久化操作不一定是為了重用,在數(shù)據(jù)執(zhí)行較長,或數(shù)據(jù)比較重要的場和也可以采用持久化操作。

3、RDD ChechPoint 檢查點

所謂的檢查點其實就是通過將RDD中間結(jié)果寫入磁盤,由于血緣依賴過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果檢查點之后有節(jié)點出現(xiàn)問題,可以從檢查點開始重做血緣,減少了開銷。對RDD進行 chechpoint 操作并不會馬上被執(zhí)行,必須執(zhí)行 Action 操作才能觸發(fā)。

1) 說明

checkpoint 需要落盤,需要指定檢查點保存路徑,檢查點路徑保存的文件,當(dāng)作業(yè)執(zhí)行完畢后,不會被刪除,一般保存路徑都是在分布式存儲系統(tǒng):比如說hdfs。checkpoint 有個路徑參數(shù),需要有一個保存檢查點的文件路徑,SparkContext.setCheckpointDir()使用這個方法來保存路徑,里面的參數(shù)是路徑。
它的使用方法和上面的那個持久化的位置是一樣的,他們的功能也是比較像,都是將數(shù)據(jù)保存下來,然后要重新執(zhí)行RDD,可以直接從這里開始。

2) 代碼示例

package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.StorageLevel// 持久化的檢查點 class Spark02_RDD_chijiu2 {} object Spark02_RDD_chijiu2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)context.setCheckpointDir("cp") // setCheckpointDir()這個是下面的檢查點的保存路徑val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//checkpoint 需要落盤,需要指定檢查點保存路徑//檢查點路徑保存的文件,當(dāng)作業(yè)執(zhí)行完畢后,不會被刪除//一般保存路徑都是在分布式存儲系統(tǒng):比如說hdfsmapRDD.checkpoint() //檢查點val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()} }

4、Cache & Persist & Checkpoint 區(qū)別

cache:將數(shù)據(jù)臨時存儲在內(nèi)存中進行數(shù)據(jù)重用。會在血緣關(guān)系中添加新的依賴。一旦出現(xiàn)問題,可以從頭讀取數(shù)據(jù)。
persist:將數(shù)據(jù)臨時存儲在磁盤文件中,進行數(shù)據(jù)重用,涉及到磁盤IO,性能較低,但是數(shù)據(jù)安全,如果作業(yè)執(zhí)行完畢,臨時保存的數(shù)據(jù)文件就會丟失。
checkpoint:將數(shù)據(jù)長久的保存在磁盤文件中進行數(shù)據(jù)重用,涉及到磁盤IO,性能較低,但是數(shù)據(jù)安全,為了保證數(shù)據(jù)安全,所以一般情況下,會獨立執(zhí)行作業(yè),為了能夠提高效率,一般情況下,是需要和cache聯(lián)合使用,先cache 然后再行執(zhí)行 checkpoint 操作。執(zhí)行過程中,會切斷血緣關(guān)系,重新建立新的血緣關(guān)系。
注意:checkpoint 等同于改變數(shù)據(jù)源,相當(dāng)于檢查點作為了新的數(shù)據(jù)源。所以直接切斷了之前的血緣關(guān)系,重新開了一脈,自己這里開始。

package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.StorageLevel// 持久化的檢查點 class Spark02_RDD_chijiu2 {} object Spark02_RDD_chijiu2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)context.setCheckpointDir("cp") // setCheckpointDir()這個是下面的檢查點的保存路徑val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//checkpoint 需要落盤,需要指定檢查點保存路徑//檢查點路徑保存的文件,當(dāng)作業(yè)執(zhí)行完畢后,不會被刪除//一般保存路徑都是在分布式存儲系統(tǒng):比如說hdfsmapRDD.cache().checkpoint()//mapRDD.checkpoint() //檢查點val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()} }

總結(jié)

以上是生活随笔為你收集整理的RDD 持久化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。