日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

走近RDD

發布時間:2025/3/20 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 走近RDD 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  RDD(Resilient Distributed Datasets)彈性分布式數據集。RDD可以看成是一個簡單的"數組",對其進行操作也只需要調用有限的"數組"中的方法即可,但它與一般數組的區別在于:RDD是分布式存儲,可以跟好的利用現有的云數據平臺,并在內存中進行。此處的彈性指的是數據的存儲方式,及數據在節點中進行存儲的時候,既可以使用內存也可以使用磁盤。此外,RDD還具有很強的容錯性,在spark運行計算的過程中,不會因為某個節點錯誤而使得整個任務失敗;不通節點中并發運行的數據,如果在某個節點發生錯誤時,RDD會自動將其在不同的節點中重試。

  RDD一大特性是延遲計算,即一個完整的RDD運行任務被分成2部分:Transformation和Action。

  Transformation用于對RDD的創建。在spark中,RDD只能使用Transformation來創建,同時Transformation還提供了大量的操作方法。RDD還可以利用Transformation來生成新的RDD,這樣可以在有限的內存空間中生成竟可能多的數據對象。無論發生了多少次Transformation,此時,在RDD中真正數據計算運行的操作Action都沒真正的開始運行。

?

?

  Action是數據的執行部分,其也提供了大量的方法去執行數據的計算操作部分。

?

?  RDD可以將其看成一個分布在不同節點中的分布式數據集,并將數據以數據塊(Block)的形式存儲在各個節點的計算機中。每個BlockMaster管理著若干個BlockSlave,而每個BlockSlave又管理著若干個BlockNode。當BlockSlave獲得了每個Node節點的地址,又會反向向BlockMaster注冊每個Node的基本信息,這樣就形成了分層管理。

?

  RDD依賴

窄依賴 (narrowdependencies) 和寬依賴 (widedependencies) 。窄依賴是指 父 RDD 的每個分區都只被子 RDD 的一個分區所使用,例如map、filter。相應的,那么寬依賴就是指父 RDD 的分區被多個子 RDD 的分區所依賴,例如groupByKey、reduceByKey等操作。如果父RDD的一個Partition被一個子RDD的Partition所使用就是窄依賴,否則的話就是寬依賴。   這種劃分有兩個用處。首先,窄依賴支持在一個結點上管道化執行。例如基于一對一的關系,可以在 filter 之后執行 map 。其次,窄依賴支持更高效的故障還原。因為對于窄依賴,只有丟失的父 RDD 的分區需要重新計算。而對于寬依賴,一個結點的故障可能導致來自所有父 RDD 的分區丟失,因此就需要完全重新執行。因此對于寬依賴,Spark 會在持有各個父分區的結點上,將中間數據持久化來簡化故障還原,就像 MapReduce 會持久化 map 的輸出一樣。對于join操作有兩種情況,如果join操作的使用每個partition僅僅和已知的Partition進行join,此時的join操作就是窄依賴;其他情況的join操作就是寬依賴;因為是確定的Partition數量的依賴關系,所以就是窄依賴,得出一個推論,窄依賴不僅包含一對一的窄依賴,還包含一對固定個數的窄依賴(也就是說對父RDD的依賴的Partition的數量不會隨著RDD數據規模的改變而改變)                  下面就是RDD API
  1、parallelize   def parallelize[T](seq : scala.Seq[T], numSlices : scala.Int = { /* compiled code */ }) //第一個參數是數據,同時還有一個帶有默認數值的參數,改參數為1,該參數表示的是將數據分布在多少個數據節點中存放。   2、aggregate   def aggregate[U](zeroValue : U)(seqOp : scala.Function2[U, T, U], combOp : scala.Function2[U, U, U]) //seqOp 是給定的計算方法,combOp 是合并方法,將第一個計算方法得出的結果與源碼中的zeroValue進行合并。實例: import org.apache.spark.{SparkConf, SparkContext}object test {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("test")val sc=new SparkContext(conf)val arr=sc.parallelize(Array(1,2,3,4,5,6,7,8))//parallelize將內存數據讀入Spark系統中,作為整體數據集val result=arr.aggregate(0)(math.max(_,_),_+_)//_+_ 對傳遞的第一個方法的結果集進行進一步處理 println(result)} }

結果為8

import org.apache.spark.{SparkConf, SparkContext}object test {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("test")val sc=new SparkContext(conf)val arr=sc.parallelize(Array("abd","hello world","hello sb"))//parallelize將內存數據讀入Spark系統中,作為整體數據集val result=arr.aggregate("")((value,word)=>value+word,_+_)//_+_ 對傳遞的第一個方法的結果集進行進一步處理 println(result)} }

結果為abdhello worldhello sb

  3、cache是將數據內容計算并保存在計算節點的內存中

  4、cartesion是用于對不同的數組進行笛卡爾操作,要求是數組的長度必須相同

import org.apache.spark.{SparkConf, SparkContext}object test {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("test")val sc=new SparkContext(conf)val arr1=sc.parallelize(Array(1,2,3,4))//parallelize將內存數據讀入Spark系統中,作為整體數據集val arr2=sc.parallelize(Array(4,3,2,1))val res=arr1.cartesian(arr2)res.foreach(print)} }

結果:(1,4)(1,3)(1,2)(1,1)(2,4)(2,3)(2,2)(2,1)(3,4)(3,3)(3,2)(3,1)(4,4)(4,3)(4,2)(4,1)

  5、Coalesce是將已經存儲的數據重新分片后再進行存儲(repartition與Coalesce類似)

import org.apache.spark.{SparkConf, SparkContext}object test {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("test")val sc=new SparkContext(conf)val arr1=sc.parallelize(Array(1,2,3,4,5,6))//parallelize將內存數據讀入Spark系統中,作為整體數據集val arr2=arr1.coalesce(2,true)val res1=arr1.aggregate(0)(math.max(_,_),_+_)println(res1)val res2=arr2.aggregate(0)(math.max(_,_),_+_)println(res2)} }

結果為6 ? ?11

  6、countByValue是計算數據集中某個數據出現的個數,并將其以map的形式返回

  7、countByKey是計算數據集中元數據鍵值對key出現的個數

import org.apache.spark.{SparkConf, SparkContext}object test {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("test")val sc=new SparkContext(conf)val arr1=sc.parallelize(Array((1,"a"),(2,'b'),(1,'c'),(1,'d'),(2,'a')))//parallelize將內存數據讀入Spark系統中,作為整體數據集val res1=arr1.countByValue()res1.foreach(println)val res2=arr1.countByKey()res2.foreach(println)} } //結果:((1,c),1) ((2,a),1) ((1,a),1) ((1,d),1) ((2,b),1) (1,3) (2,2) View Code

  8、filter是對數據集進行過濾

  9、flatMap是對RDD中的數據進行整體操作的一個特殊方法,其在定義時就是針對數據集進行操作

  10、map可以對RDD中的數據集進行逐個操作,其與flatmap不同得是,flatmap是將數據集中的數據作為一個整體去處理,之后再對其中的數據做計算,而map則直接對數據集中的數據做單獨的處理

  11、groupBy是將傳入的數據進行分組

  12、keyBy是為數據集中的每個個體數據添加一個key,從而形成鍵值對

  13、reduce同時對2個數據進行處理,主要是對傳入的數據進行合并處理

  14、sortBy是對已有的RDD進行重新排序

import org.apache.spark.{SparkConf, SparkContext}object test {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local").setAppName("test")val sc=new SparkContext(conf)val arr1=sc.parallelize(Array((1,"a"),(2,"c"),(3,"b"),(4,"x"),(5,"f")))//parallelize將內存數據讀入Spark系統中,作為整體數據集val res1=arr1.sortBy(word=>word._1,true)val res2=arr1.sortBy(word=>word._2,true)res1.foreach(println)res2.foreach(println)} }

  15、zip可以將若干個RDD壓縮成一個新的RDD

?

轉載于:https://www.cnblogs.com/czx1/p/7497303.html

總結

以上是生活随笔為你收集整理的走近RDD的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。