日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark _02SparkCore_RDD

發布時間:2024/2/28 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark _02SparkCore_RDD 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、Spark Java-Scala 混編Maven開發

(1)IDEA創建Maven 項目

  • 創建項目

  • 配置名稱,點擊下一步配置Maven及本地Maven倉庫地址。
  • 配置項目名稱和位置,并創建。
  • 更新替換Maven pom.xml文件,注意groupId,artifactId,version不要更新替換。
  • pom.xml見

https://blog.csdn.net/qq_41946557/article/details/102639605

  • 在main 目錄下創建目錄。名稱任意。

  • ?將main下的java和scala指定為源目錄:

2、SparkCore

  • RDD
  • 概念

RDD(Resilient Distributed Dateset),彈性分布式數據集。

  • RDD的五大特性:
* Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file)
  • RDD是由一系列的partition組成的。
  • 函數是作用在每一個partition(split)上的。
  • RDD之間有一系列的依賴關系。
  • 分區器是作用在K,V格式的RDD上。
  • RDD提供一系列最佳的計算位置。
    • RDD完美理解圖:

    • 注意:
  • textFile方法底層封裝的是讀取MR讀取文件的方式,讀取文件之前先split,默認split大小是一個block大小。
  • RDD實際上不存儲數據,這里方便理解,暫時理解為存儲數據。
  • 什么是K,V格式的RDD?
    • 如果RDD里面存儲的數據都是二元組對象,那么這個RDD我們就叫做K,V格式的RDD。
  • 哪里體現RDD的彈性(容錯)?
    • partition數量,大小沒有限制,體現了RDD的彈性。
    • RDD之間依賴關系,可以基于上一個RDD重新計算出RDD。
  • 哪里體現RDD的分布式?
    • RDD是由Partition組成,partition是分布在不同節點上的。
    • RDD提供計算最佳位置,體現了數據本地化。體現了大數據中“計算移動數據不移動”的理念。

    完美總結圖:::

    ?

    • Spark任務執行原理

    以上圖中有四個機器節點,Driver和Worker是啟動在節點上的進程,運行在JVM中的進程。

    • Driver與集群節點之間有頻繁的通信。
    • Driver負責任務(tasks)的分發和結果的回收。任務的調度。如果task的計算結果非常大就不要回收了。會造成oom。
    • Worker是Standalone資源調度框架里面資源管理的從節點。也是JVM進程。
    • Master是Standalone資源調度框架里面資源管理的主節點。也是JVM進程。

    Spark代碼流程

    • 創建SparkConf對象
  • 可以設置Application name。
  • 可以設置運行模式及資源需求。
    • 創建SparkContext對象
    • 基于Spark的上下文創建一個RDD,對RDD進行處理。
    • 應用程序中要有Action類算子來觸發Transformation類算子執行。
    • 關閉Spark上下文對象SparkContext。

    【注,后面有代碼實例】


    Transformations轉換算子

    • 概念:

    Transformations類算子是一類算子(函數)叫做轉換算子,如map,flatMap,reduceByKey等。Transformations算子是延遲執行,也叫懶加載執行。

    • Transformation類算子:
    ?filter 過濾符合條件的記錄數,true保留,false過濾掉。?map 將一個RDD中的每個數據項,通過map中的函數映射變為一個新的元素。 特點:輸入一條,輸出一條數據。?flatMap 先map后flat。與map類似,每個輸入項可以映射為0到多個輸出項。?sample 隨機抽樣算子,根據傳進去的小數按比例進行又放回或者無放回的抽樣。?reduceByKey 將相同的Key根據相應的邏輯進行處理。 ?sortByKey/sortBy 作用在K,V格式的RDD上,對key進行升序或者降序排序。

    Action行動算子

    • 概念:

    Action類算子也是一類算子(函數)叫做行動算子,如foreach,collect,count等。Transformations類算子是延遲執行,Action類算子是觸發執行。一個application應用程序中有幾個Action類算子執行,就有幾個job運行。

    • Action類算子
    ?count 返回數據集中的元素數。會在結果計算完成后回收到Driver端。?take(n) 返回一個包含數據集前n個元素的集合。?first first=take(1),返回數據集中的第一個元素。?foreach 循環遍歷數據集中的每個元素,運行相應的邏輯。?collect 將計算結果回收到Driver端。

    代碼演示:見下篇博客:

    https://blog.csdn.net/qq_41946557/article/details/102646935

    ?



    ?

    控制算子

    • 概念:

    控制算子有三種,cache,persist,checkpoint,以上算子都可以將RDD持久化,持久化的單位是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系。

    • cache

    默認將RDD的數據持久化到內存中。cache是懶執行。

    • 注意:chche () = persist()=persist(StorageLevel.Memory_Only)
    • 測試cache文件:

    測試代碼:

    package ddd.henu.persistentimport org.apache.spark.{SparkConf, SparkContext}object CacheTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("cache")val sc = new SparkContext(conf)sc.setLogLevel("error")var lines = sc.textFile("./data/persistData.txt")lines = lines.cache()val startTime1 = System.currentTimeMillis()val result1 = lines.count() //當第一次運行時,從磁盤讀取。val endTime1 = System.currentTimeMillis()println(s"條數: $result1,磁盤time:${endTime1-startTime1}")val startTime2 = System.currentTimeMillis()val result2 = lines.count() //第二次,從緩存val endTime2 = System.currentTimeMillis()println(s"條數: $result2,內存time:${endTime2-startTime2}")/*結果:條數: 5138965,磁盤time:6085條數: 5138965,內存time:111*/sc.stop()} }

    【注】System.currentTimeMillis()小知識:存在性能問題?

    https://blog.csdn.net/qq_41946557/article/details/102647865


    ?

    • persist:

    可以指定持久化的級別。最常用的是MEMORY_ONLY和MEMORY_AND_DISK?!盻2”表示有副本數。

    持久化級別如下:

    【注】MEMORY_AND_DISK指先存內存,存不下后,存入磁盤,會序列化,雖說寫的false.

    代碼部分演示:?

    ?

    • cache和persist的注意事項:
  • cache和persist都是懶執行,必須有一個action類算子觸發執行。
  • cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數據了。持久化的單位是partition。
  • cache和persist算子后不能立即緊跟action算子。
  • cache和persist算子持久化的數據當applilcation執行完成之后會被清除。
  • 錯誤:rdd.cache().count() 返回的不是持久化的RDD,而是一個數值了。


    • checkpoint

    checkpoint將RDD持久化到磁盤,還可以切斷RDD之間的依賴關系。checkpoint目錄數據當application執行完之后不會被清除。

    • checkpoint 的執行原理:
  • 當RDD的job執行完畢后,會從finalRDD從后往前回溯。
  • 當回溯到某一個RDD調用了checkpoint方法,會對當前的RDD做一個標記。
  • Spark框架會自動啟動一個新的job,重新計算這個RDD的數據,將數據持久化到HDFS上。
    • 優化:對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job只需要將內存中的數據拷貝到HDFS上就可以,省去了重新計算這一步。
    • 使用:

    ?

    SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("checkpoint");JavaSparkContext sc = new JavaSparkContext(conf);sc.setCheckpointDir("./checkpoint");JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));parallelize.checkpoint();parallelize.count();sc.stop();

    ?

    總結

    以上是生活随笔為你收集整理的Spark _02SparkCore_RDD的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。