Spark _02SparkCore_RDD
1、Spark Java-Scala 混編Maven開發
(1)IDEA創建Maven 項目
- 創建項目
- 配置名稱,點擊下一步配置Maven及本地Maven倉庫地址。
- 配置項目名稱和位置,并創建。
- 更新替換Maven pom.xml文件,注意groupId,artifactId,version不要更新替換。
- pom.xml見
https://blog.csdn.net/qq_41946557/article/details/102639605
- 在main 目錄下創建目錄。名稱任意。
- ?將main下的java和scala指定為源目錄:
2、SparkCore
- RDD
- 概念
RDD(Resilient Distributed Dateset),彈性分布式數據集。
- RDD的五大特性:
- RDD完美理解圖:
- 注意:
- 如果RDD里面存儲的數據都是二元組對象,那么這個RDD我們就叫做K,V格式的RDD。
- partition數量,大小沒有限制,體現了RDD的彈性。
- RDD之間依賴關系,可以基于上一個RDD重新計算出RDD。
- RDD是由Partition組成,partition是分布在不同節點上的。
- RDD提供計算最佳位置,體現了數據本地化。體現了大數據中“計算移動數據不移動”的理念。
完美總結圖:::
?
- Spark任務執行原理
以上圖中有四個機器節點,Driver和Worker是啟動在節點上的進程,運行在JVM中的進程。
- Driver與集群節點之間有頻繁的通信。
- Driver負責任務(tasks)的分發和結果的回收。任務的調度。如果task的計算結果非常大就不要回收了。會造成oom。
- Worker是Standalone資源調度框架里面資源管理的從節點。也是JVM進程。
- Master是Standalone資源調度框架里面資源管理的主節點。也是JVM進程。
Spark代碼流程
- 創建SparkConf對象
- 創建SparkContext對象
- 基于Spark的上下文創建一個RDD,對RDD進行處理。
- 應用程序中要有Action類算子來觸發Transformation類算子執行。
- 關閉Spark上下文對象SparkContext。
【注,后面有代碼實例】
Transformations轉換算子
- 概念:
Transformations類算子是一類算子(函數)叫做轉換算子,如map,flatMap,reduceByKey等。Transformations算子是延遲執行,也叫懶加載執行。
- Transformation類算子:
Action行動算子
- 概念:
Action類算子也是一類算子(函數)叫做行動算子,如foreach,collect,count等。Transformations類算子是延遲執行,Action類算子是觸發執行。一個application應用程序中有幾個Action類算子執行,就有幾個job運行。
- Action類算子
代碼演示:見下篇博客:
https://blog.csdn.net/qq_41946557/article/details/102646935
?
?
控制算子
- 概念:
控制算子有三種,cache,persist,checkpoint,以上算子都可以將RDD持久化,持久化的單位是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系。
- cache
默認將RDD的數據持久化到內存中。cache是懶執行。
- 注意:chche () = persist()=persist(StorageLevel.Memory_Only)
- 測試cache文件:
測試代碼:
package ddd.henu.persistentimport org.apache.spark.{SparkConf, SparkContext}object CacheTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("cache")val sc = new SparkContext(conf)sc.setLogLevel("error")var lines = sc.textFile("./data/persistData.txt")lines = lines.cache()val startTime1 = System.currentTimeMillis()val result1 = lines.count() //當第一次運行時,從磁盤讀取。val endTime1 = System.currentTimeMillis()println(s"條數: $result1,磁盤time:${endTime1-startTime1}")val startTime2 = System.currentTimeMillis()val result2 = lines.count() //第二次,從緩存val endTime2 = System.currentTimeMillis()println(s"條數: $result2,內存time:${endTime2-startTime2}")/*結果:條數: 5138965,磁盤time:6085條數: 5138965,內存time:111*/sc.stop()} }【注】System.currentTimeMillis()小知識:存在性能問題?
https://blog.csdn.net/qq_41946557/article/details/102647865
?
- persist:
可以指定持久化的級別。最常用的是MEMORY_ONLY和MEMORY_AND_DISK?!盻2”表示有副本數。
持久化級別如下:
【注】MEMORY_AND_DISK指先存內存,存不下后,存入磁盤,會序列化,雖說寫的false.
代碼部分演示:?
?
- cache和persist的注意事項:
錯誤:rdd.cache().count() 返回的不是持久化的RDD,而是一個數值了。
- checkpoint
checkpoint將RDD持久化到磁盤,還可以切斷RDD之間的依賴關系。checkpoint目錄數據當application執行完之后不會被清除。
- checkpoint 的執行原理:
- 優化:對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job只需要將內存中的數據拷貝到HDFS上就可以,省去了重新計算這一步。
- 使用:
?
SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("checkpoint");JavaSparkContext sc = new JavaSparkContext(conf);sc.setCheckpointDir("./checkpoint");JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));parallelize.checkpoint();parallelize.count();sc.stop();?
總結
以上是生活随笔為你收集整理的Spark _02SparkCore_RDD的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark _01初识
- 下一篇: Spark _03RDD_Transfo