RDD(弹性分布式数据集)
1、什么是RDD
RDD(分布式彈性數(shù)據(jù)集)是對(duì)分布式計(jì)算的抽象,代表要處理的數(shù)據(jù),一個(gè)數(shù)據(jù)集,RDD是只讀分區(qū)的集合。數(shù)據(jù)被分片,分成若干個(gè)數(shù)據(jù)分片,存儲(chǔ)到不同的節(jié)點(diǎn)中,可以被并行的操作,所以叫分布式數(shù)據(jù)集。計(jì)算時(shí)優(yōu)先考慮放于內(nèi)存中,如果放不下把一部分放在磁盤上保存。
RDD(分布式彈性數(shù)據(jù)集)是整個(gè)Spark抽象的基石,是基于工作集的應(yīng)用抽象。Spark的各個(gè)子框架,Spark SQL、Spark Streaming、SparkR、GraphX、ML等,其底層封裝的都是RDD。(也就是說(shuō)①RDD提供了通用的抽象;②開發(fā)者可用根據(jù)自己所在的領(lǐng)域進(jìn)行建模,開發(fā)出相應(yīng)的子框架)。
RDD本身會(huì)有一系列的數(shù)據(jù)分片,RDD在邏輯上抽象的代表了底層的一個(gè)輸入文件,可能是一個(gè)文件夾,但是實(shí)際上是按分區(qū)partition分為多個(gè)分區(qū),分區(qū)會(huì)放在Spark集群中不同的機(jī)器節(jié)點(diǎn)上,假設(shè)有1億條數(shù)據(jù),可能每臺(tái)機(jī)器上放10萬(wàn)條,需要1000臺(tái)機(jī)器,而且這1000臺(tái)機(jī)器上的10萬(wàn)條數(shù)據(jù)是按照partition為單位去管理的。所謂partition就是特定規(guī)模的數(shù)據(jù)大小,就是數(shù)據(jù)集合。Spark中一切操作皆RDD。
?
2、工作集與數(shù)據(jù)集
基于工作集和基于數(shù)據(jù)集都提供一些特征如位置感知(具體數(shù)據(jù)在哪里,只不過是不同的實(shí)現(xiàn)),容錯(cuò),負(fù)載均衡。
(1)基于數(shù)據(jù)集的工作方式:從物理存儲(chǔ)加載數(shù)據(jù),然后操作數(shù)據(jù),然后寫入物理存儲(chǔ)設(shè)備。如Hadoop的MapReduce是基于數(shù)據(jù)集的。基于數(shù)據(jù)集的有幾種場(chǎng)景不太適用:①不適合大量的迭代,如機(jī)器學(xué)習(xí),算法比較復(fù)雜的時(shí)候②不適合交互式查詢,每次的查詢都需要從磁盤讀取數(shù)據(jù),然后再查詢寫會(huì)數(shù)據(jù)結(jié)果,每一次都這樣。(重點(diǎn)是基于數(shù)據(jù)集的方式不能改復(fù)用曾的結(jié)果或計(jì)算中間結(jié)果)
(2)基于工作集的工作方式,具有基于數(shù)據(jù)集的工作方式的優(yōu)點(diǎn)即自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性,同時(shí)夠?qū)诠ぷ骷挠?jì)算任務(wù)也具有良好的描述能力,即支持中間結(jié)果的復(fù)用場(chǎng)景。
3、RDD的彈性表現(xiàn)在哪幾個(gè)方面
Spark的RDD是基于工作集的,不僅具具有基于數(shù)據(jù)集的特點(diǎn),而且RDD本身還有其特點(diǎn):Resilient(彈性).
(1)自動(dòng)進(jìn)行內(nèi)存和磁盤的數(shù)據(jù)存儲(chǔ)的切換:RDD代表一系列數(shù)據(jù)分片在不同的節(jié)點(diǎn)中存儲(chǔ),默認(rèn)優(yōu)先考慮在內(nèi)存中,如果放不下把一部分放在磁盤上保存,而這一切對(duì)用戶來(lái)說(shuō)是透明的,不用關(guān)心RDD的partition放在哪里,只要針對(duì)RDD計(jì)算處理就行了。所以說(shuō)RDD本身會(huì)自動(dòng)的進(jìn)行磁盤和內(nèi)存的切換;
(2)自動(dòng)Lineage血統(tǒng)的高效容錯(cuò):在運(yùn)行階段,會(huì)有一系列的RDD,以用于容錯(cuò)恢復(fù),假設(shè)一個(gè)計(jì)算鏈條有900個(gè)步驟,假設(shè)第888步出錯(cuò),由于有血統(tǒng)關(guān)系,可以從第887個(gè)步驟恢復(fù),不需要從第一個(gè)步驟開始計(jì)算,這極大的提升了錯(cuò)誤恢復(fù)的速度;
(3)task失敗會(huì)自動(dòng)進(jìn)行特定次數(shù)的重試,默認(rèn)4次:假設(shè)900個(gè)計(jì)算步驟的任務(wù)作為一個(gè)task,進(jìn)行容錯(cuò),恢復(fù)的時(shí)候從第800個(gè)步驟開始恢復(fù),恢復(fù)好幾次都沒有成功這個(gè)task就失敗了,調(diào)度器底層會(huì)自動(dòng)進(jìn)行容錯(cuò)。
(4)Stage(一個(gè)計(jì)算階段)如果失敗,會(huì)自動(dòng)進(jìn)行特定次數(shù)的重試,只計(jì)算失敗的數(shù)據(jù)分片,默認(rèn)3次:就是task底層嘗試好幾次都失敗,這個(gè)時(shí)候整個(gè)階段就會(huì)失敗,整個(gè)階段會(huì)有很多并行的數(shù)據(jù)分片,他們計(jì)算邏輯一樣只是處理的數(shù)據(jù)分片不一樣。是再次提交Stage的時(shí)候如果這個(gè)Stage中假設(shè)有100個(gè)數(shù)據(jù)分片只是3,5個(gè)失敗,再次提交Stage的時(shí)候會(huì)看看其他成功的任務(wù)有沒有輸出,有的話就不會(huì)第二次提交的時(shí)候把這100個(gè)任務(wù)再次提交,只會(huì)提交失敗的那幾個(gè)。
(5)checkpoint和persist:checkpoint(每次對(duì)RDD的操作都會(huì)產(chǎn)生新的RDD,除了action觸發(fā)job以外,有時(shí)處理鏈條比較長(zhǎng),計(jì)算比較笨重時(shí),需要考慮將數(shù)據(jù)落地);persist:(內(nèi)存、磁盤的復(fù)用[效率和容錯(cuò)的延伸])
(6)數(shù)據(jù)調(diào)度彈性:DAG? TASK 和資源管理無(wú)關(guān)
(7)數(shù)據(jù)分片的高度彈性:repartition和coalesce①在數(shù)據(jù)計(jì)算時(shí),會(huì)產(chǎn)生很多分片,這時(shí)如果partition非常小,每個(gè)分片每次都消耗一個(gè)線程進(jìn)行處理,會(huì)降低處理效率,但是如果把把幾個(gè)partition合并成一個(gè)比較大的partition,會(huì)提高效率②如果每個(gè)partition的block比較大可能內(nèi)存不足,這時(shí)會(huì)考慮將其變成更小的分片,這時(shí)Spark會(huì)出現(xiàn)更多的處理批次,避險(xiǎn)出現(xiàn)OOM
注意:repartition內(nèi)部調(diào)用的是coalesce,傳進(jìn)的shuffle為true。coalesce默認(rèn)shuffle為fasle。所以數(shù)據(jù)分片由多變?yōu)樯俚挠胏oalesce不進(jìn)行shuffle,如果數(shù)據(jù)分片由少到多不經(jīng)過shuffle是不行的,使用repartition。
4、RDD的lazy特性
由于RDD是只讀分區(qū)的集合,那么每次的操作都會(huì)改變數(shù)據(jù),會(huì)產(chǎn)生中間結(jié)果,這時(shí)就采用lazy的級(jí)別,對(duì)數(shù)據(jù)不進(jìn)行計(jì)算。
RDD的核心之一就是他的lazy級(jí)別,因?yàn)椴凰?#xff0c;開始的時(shí)候只對(duì)數(shù)據(jù)處理做標(biāo)記,包括textfile根本不從磁盤讀數(shù)據(jù),faltMap根本就沒開始計(jì)算,他只不過是產(chǎn)生了一個(gè)操作的標(biāo)記而已。
上圖為flatMap的源碼,flatMap產(chǎn)生了一個(gè)new MapPartitionRDD,但是看它的構(gòu)造,第一個(gè)參數(shù)是this,this是當(dāng)前對(duì)象,指父RDD,即生成的RDD所依賴的RDD。這樣,Spark的RDD是只讀的,且是lazy級(jí)別的,每次構(gòu)建的新的RDD時(shí),都是將其父RDD作為第一個(gè)參數(shù)傳遞進(jìn)來(lái)生成新的RDD,這樣就構(gòu)成了一個(gè)鏈條結(jié)構(gòu)
5、常規(guī)的容錯(cuò)方式
常規(guī)的容錯(cuò)方式:數(shù)據(jù)檢查點(diǎn)和記錄數(shù)據(jù)更新的方式。
5.1 數(shù)據(jù)檢查點(diǎn)
分布式的計(jì)算數(shù)據(jù)檢查點(diǎn)的基本工作方式就是:通過數(shù)據(jù)中心的網(wǎng)絡(luò)連接,不同的機(jī)器每次操作的時(shí)候都要負(fù)責(zé)整個(gè)數(shù)據(jù)集,就相當(dāng)于每次都有一個(gè)拷貝,這個(gè)是需要網(wǎng)絡(luò)的,復(fù)制到其他機(jī)器上,而網(wǎng)絡(luò)帶寬就是分布式的瓶頸。每次拷貝對(duì)存儲(chǔ)資源也是非常大的消耗。
5.2 記錄數(shù)據(jù)更新
記錄數(shù)據(jù)更新的工作方式:每次數(shù)據(jù)變化我們就記錄一下,這個(gè)方式不需要重新拷貝一份數(shù)據(jù),但是這種方式復(fù)雜,而且更新的話就變成數(shù)據(jù)可更新,那很多操作全局?jǐn)?shù)據(jù)容易失控,原子性對(duì)分布式來(lái)說(shuō)太可怕了第一復(fù)雜第二耗性能。
因?yàn)镽DD是從后往前的鏈條依賴關(guān)系,所以容錯(cuò)的開銷非常低
5.3 Spark的RDD的容錯(cuò)方式
Spark就是記錄數(shù)據(jù)更新的方式,原因又2點(diǎn):①、RDD是不可變的+lazy(因?yàn)椴豢勺儾淮嬖谌中薷牡膯栴},控制難度就極大的下降,在這基礎(chǔ)上有計(jì)算鏈條,假設(shè)901個(gè)步驟錯(cuò)了,從900個(gè)步驟開始恢復(fù)(這個(gè)前提是要持久化persit/checkpoint或者上一個(gè)Stage結(jié)束))。②RDD是粗粒度的操作,為了效率,每次操作的時(shí)候作用所有數(shù)據(jù)集合(所謂的粗粒度就是每次操作都作用于全部的數(shù)據(jù)集)。如果更新力度太細(xì)記錄成本就會(huì)高效率就低了。對(duì)RDD的所有寫或者修改都是粗粒度的,通過元數(shù)據(jù)記錄數(shù)據(jù)更新是寫操作,我們?cè)谶@邊說(shuō)RDD是粗粒度的指的是RDD的寫操作是粗粒度的,但是RDD的讀操作即可是粗粒度的也可以是細(xì)粒度的(例如通過RDD讀取數(shù)據(jù)庫(kù)可以讀取一條記錄)。RDD的寫操作是粗粒度的限制了他的使用場(chǎng)景,例如說(shuō)網(wǎng)絡(luò)爬蟲就不適合,但是現(xiàn)實(shí)世界中,大多數(shù)的場(chǎng)景是粗粒度的
5.4 RDD中的幾個(gè)核心方法及屬性
(1)partitioner:分區(qū)器,類似MapReduce的的Partitioner接口,控制key到哪個(gè)reduce
?
(2)compute:compute方法是針對(duì)RDD的每個(gè)Partition進(jìn)行計(jì)算的
所有的RDD操作返回的都是一個(gè)迭代器,這個(gè)好處就是假設(shè)用spark sql提取出數(shù)據(jù)后產(chǎn)生新的RDD,機(jī)器學(xué)習(xí)訪問這個(gè)RDD不用關(guān)心他是不是sparksql,因?yàn)槭腔趇terator,那就可以用hasNext看下有么有下個(gè)元素,用next讀取下個(gè)元素,這就讓所有框架無(wú)縫集成。
compute傳進(jìn)的第一個(gè)參數(shù)split是Partition類型的,Partition是RDD并行的劃分單元,其在Spark中的抽象定義十分簡(jiǎn)單如下:
它定義了一個(gè)index唯一表示這個(gè)partition,它更像一個(gè)指針指向?qū)嶓w數(shù)據(jù),Partition的具體實(shí)現(xiàn)有很多,包括HadoopPartition, JdbcPartition, ParallelCollectionPartition等。
(3)getPartition:getPartitions返回的是一系列partitions的集合,即一個(gè)Partition類型的數(shù)組。是在partitions方法中調(diào)用getPartition方法的。
(4)getDependencies:獲取所有依賴關(guān)系
(5)getPreferredLocations:輸入?yún)?shù)是Partition類型的split分片,輸出結(jié)果是一組優(yōu)先的節(jié)點(diǎn)位置。
5.5?HadoopRDD
(1)getPartition實(shí)現(xiàn)
?
首先getJobConf():用來(lái)獲取job Configuration,獲取配置方式有clone和非clone方式,clone方式是線程不安全的,,非clone方式可以從cache中獲取,如cache中沒有那就創(chuàng)建一個(gè)新的,然后再放到cache中;然后獲得InputFormat實(shí)例對(duì)象;調(diào)用getSplits方法來(lái)計(jì)算分片,然后把分片HadoopPartition包裝到到array里面返回
(2)compute實(shí)現(xiàn)
輸入值是一個(gè)Partition,返回是一個(gè)Iterator[(K, V)]類型的數(shù),compute方法是通過分片來(lái)獲得Iterator接口,以遍歷分片的數(shù)據(jù)把Partition轉(zhuǎn)成HadoopPartition
通過InputSplit創(chuàng)建一個(gè)RecordReader
重寫Iterator的getNext方法,通過創(chuàng)建的reader調(diào)用next方法讀取下一個(gè)值
(3)getPreferredLocations
調(diào)用InputSplit的getLocations方法獲得所在的位置
6、RDD的生命周期
6.1 創(chuàng)建RDD
Spark程序中創(chuàng)建的第一個(gè)RDD代表了Spark應(yīng)用程序輸入數(shù)據(jù)的來(lái)源。通過Transformation來(lái)對(duì)RDD進(jìn)行各種算子的轉(zhuǎn)換,實(shí)現(xiàn)算法。
常見的創(chuàng)建初始RDD的方式①使用程序中的集合,②使用本地文件系統(tǒng)創(chuàng)建RDD,③使用HDFS創(chuàng)建RDD,④基于DB創(chuàng)建RDD,⑤基于NOSQL創(chuàng)建RDD,⑥基于S3創(chuàng)建RDD,⑦基于數(shù)據(jù)流創(chuàng)建RDD
6.2 構(gòu)建執(zhí)行計(jì)劃
RDD 在調(diào)用Transformation算子和action 算子后構(gòu)成一個(gè)RDD鏈條,即血緣,然后DAGScheduler 會(huì)根據(jù) RDD 之間的依賴關(guān)系劃分Stage ,最后終封裝成 TaskSetManager 根據(jù)不同的調(diào)度模型加入不同的調(diào)度隊(duì)列。
6.3 調(diào)度任務(wù)執(zhí)行
由 TaskScheduler和TaskSetManager 對(duì)TaskSet進(jìn)行進(jìn)一步資源封裝和最佳位置計(jì)算,然后進(jìn)行調(diào)度到相應(yīng)的Executor上去執(zhí)行。
6.4 結(jié)果返回
將最終的執(zhí)行結(jié)果返回給 Driver 或者輸出到指定的位置。
7、RDD的操作類型
RDD本身有3種操作類型Transformation和Action和Controller。
Transformation進(jìn)行數(shù)據(jù)狀態(tài)的轉(zhuǎn)換,根據(jù)已有的RDD創(chuàng)建一個(gè)新的RDD;Action觸發(fā)具體的作業(yè),主要是對(duì)RDD進(jìn)行最后取結(jié)果的一種操作;Controller(是控制算子,包括cache,persist,checkpoint)對(duì)性能,效率還有容錯(cuò)方面的支持。
Transformation級(jí)別的RDD是lazy的,也就是說(shuō)使用Transformation只是標(biāo)記對(duì)我們的數(shù)據(jù)進(jìn)行操作,不會(huì)真正的執(zhí)行,這是算法的描述,當(dāng)我們遇到Action或者checkpoint的時(shí)候他才會(huì)真正的操作。通過這種lazy特性,底層就可以對(duì)我們spark應(yīng)用程序優(yōu)化,因?yàn)橐恢笔茄舆t執(zhí)行,spark框架可以看見很多步驟,看見步驟越多優(yōu)化的空間就越大。
8、常用的算子
81 map
map:使用自定義的函數(shù)f,對(duì)其中的每個(gè)元素進(jìn)行處理,產(chǎn)生U類型的結(jié)果,傳入的RDD的元素類型為T類型,生成的RDD元素類型為U類型
withScope{body}?是為了確保運(yùn)行body代碼塊產(chǎn)生的所有RDDs都在同一個(gè)scope里面。首先調(diào)用了SparkContext的clean方法,實(shí)際上調(diào)用了ClosureCleaner的clean方法,這里一再清除閉包中的不能序列化的變量,防止RDD在網(wǎng)絡(luò)傳輸過程中反序列化失敗。(scala支持閉包(jvm上的閉包當(dāng)然也是一個(gè)對(duì)像),閉包會(huì)把它對(duì)外的引用(閉包里面引用了閉包外面的對(duì)像)保存到自己內(nèi)部,?這個(gè)閉包就可以被單獨(dú)使用了,而不用擔(dān)心它脫離了當(dāng)前的作用域;?但是在spark這種分布式環(huán)境里,這種作法會(huì)帶來(lái)問題,如果對(duì)外部的引用是不可serializable的,它就不能正確被發(fā)送到worker節(jié)點(diǎn)上去了;?還有一些引用,可能根本沒有用到,這些沒有使用到的引用是不需要被發(fā)到worker上的; ClosureCleaner.clean()就是用來(lái)完成這個(gè)事的;?ClosureCleaner.clean()通過遞歸遍歷閉包里面的引用,檢查不能serializable的, 去除unused的引用;?這個(gè)方法在SparkContext中用得很多,對(duì)rpc方法,只要傳入的是閉包,基本都會(huì)使用這個(gè)方法,它可以降低網(wǎng)絡(luò)io,提高executor的內(nèi)存效率)然后new了一個(gè)MapPartitionsRDD,還把清除閉包中的不能序列化的變量的匿名函數(shù)f傳進(jìn)去。MapPartitionsRDD源碼如下
MapPartitionsRDD繼承RDD[U](prev),他的源碼如下。它把RDD復(fù)制給了deps,這個(gè)OneToOneDependency是一個(gè)窄依賴,子RDD直接依賴于父RDD。
MapPartitionsRDD重寫了Partitioner,getPartitions,compute和clearDependencies,發(fā)現(xiàn)大量出現(xiàn)firstParent[T]源碼如下,返回第一個(gè)父RDD
所以partitioner和它的第一個(gè)parent RDD的partitioner保持一致(如果需要保留partitioner的話),它的partitions就是它的firstParent的partitions。它的compute函數(shù)只是調(diào)用了flatMap實(shí)例化它時(shí)輸入的函數(shù),compute函數(shù)是在父RDD遍歷每一行數(shù)據(jù)時(shí)只是調(diào)用了flatMap實(shí)例化它時(shí)輸入的函數(shù)。
看compute實(shí)際傳遞的函數(shù)和調(diào)用它的代碼,iter:Iterator[T]是一個(gè)Partition上的元素迭代器,用來(lái)遍歷RDD[T]的第pid個(gè)partition上的所有元素。?firstParent[T].iterator(split, context)?就是返回parentRDD的對(duì)應(yīng)partition的迭代器iter:Iterator[T]: 如果已經(jīng)保存了就直接讀取,否則重新計(jì)算(可以跳轉(zhuǎn)看它的實(shí)現(xiàn))。有了這個(gè)迭代器iter之后,然后用?iter.flatMap(cleanF)?來(lái)產(chǎn)生新的迭代器,返回類型是Iterator[U],這個(gè)就是最終返回的RDD: RDD[U]的partition的迭代器。
compute函數(shù)作用:在沒有依賴的條件下,根據(jù)分片的信息生成遍歷數(shù)據(jù)的Iterable接口;在有前置依賴的條件下,在父RDD的Iterable接口上給遍歷每個(gè)元素的時(shí)候再套上一個(gè)方法
8.2 flatMap
flatMap:使用自定義的函數(shù)f,對(duì)其中的每個(gè)元素進(jìn)行處理,將產(chǎn)生的結(jié)果合并成一個(gè)大的集合。
flatMap和map函數(shù)區(qū)別主要在于:map調(diào)用的是迭代器的map方法,flatMap調(diào)用的是迭代器的flatMap方法是針對(duì)RDD的每個(gè)元素利用函數(shù)f生成多個(gè)元素,然后把這些結(jié)果全部串聯(lián)起來(lái)
8.3?reduceByKey
reduceByKey這個(gè)方法不是在RDD中的,而是在PairRDDFunctions里面,因?yàn)樵赗DD的伴生對(duì)象里面已經(jīng)導(dǎo)入了,RDD內(nèi)部會(huì)發(fā)生隱式轉(zhuǎn)換,轉(zhuǎn)換為PairRDDFunctions,然后再調(diào)用這個(gè)方法。
reduceByKey內(nèi)部調(diào)用的是combineByKey
底層是基于combineByKeyWithClassTag的,combineByKey是combineByKeyWithClassTag的簡(jiǎn)寫的版本
require方法首先判斷mergeCombiners(定義兩個(gè)C類型數(shù)據(jù)的組合函數(shù))是否定義,沒有則拋異常
然后keyClass.isArray判斷如果key是Array類型,是不支持在map端合并的(mapSideCombine默認(rèn)為true即進(jìn)行本地預(yù)聚合),也不支持HashPartitioner(要想進(jìn)行Map段合并和Hash分區(qū),那么Key就必須可以通過比較內(nèi)容是否相同來(lái)確定Key是否相等以及通過內(nèi)容計(jì)算hash值,進(jìn)而進(jìn)行合并和分區(qū),然而數(shù)組判斷相等和計(jì)算hash值并不是根據(jù)它里面的內(nèi)容,而是根據(jù)數(shù)組在堆棧中的信息來(lái)實(shí)現(xiàn)的。);
然后?Aggregator創(chuàng)建一個(gè)聚合器,用于對(duì)數(shù)據(jù)進(jìn)行聚合,對(duì)參數(shù)函數(shù)執(zhí)行clean方法保證是可以被序列化的。Aggregator是核心,聚合全是交給它來(lái)完成的
Aggregator的三個(gè)泛型,第一個(gè)K,這個(gè)是你進(jìn)行combineByKey也就是聚合的條件Key,可以是任意類型。后面的V,C兩個(gè)泛型是需要聚合的值的類型,和聚合后的值的類型,兩個(gè)類型是可以一樣,也可以不一樣,例如,Spark中用的多的reduceByKey這個(gè)方法,若聚合前的值為long,那么聚合后仍為long。再比如groupByKey,若聚合前為String,那么聚合后為Iterable<String>。再看三個(gè)自定義方法:①createCombiner:這個(gè)方法會(huì)在每個(gè)分區(qū)上都執(zhí)行的,而且只要在分區(qū)里碰到在本分區(qū)里沒有處理過的Key,就會(huì)執(zhí)行該方法。執(zhí)行的結(jié)果就是在本分區(qū)里得到指定Key的聚合類型C(可以是數(shù)組,也可以是一個(gè)值,具體還是得看方法的定義了。) ②?mergeValue:這方法也會(huì)在每個(gè)分區(qū)上都執(zhí)行的,和createCombiner不同,它主要是在分區(qū)里碰到在本分區(qū)內(nèi)已經(jīng)處理過的Key才執(zhí)行該方法,執(zhí)行的結(jié)果就是將目前碰到的Key的值聚合到已有的聚合類型C中。其實(shí)方法1和2放在一起看,就是一個(gè)if判斷條件,進(jìn)來(lái)一個(gè)Key,就去判斷一下若以前沒出現(xiàn)過就執(zhí)行方法1,否則執(zhí)行方法2.?③mergeCombiner:前兩個(gè)方法是實(shí)現(xiàn)分區(qū)內(nèi)部的相同Key值的數(shù)據(jù)合并,而這個(gè)方法主要用于分區(qū)間的相同Key值的數(shù)據(jù)合并,形成最終的結(jié)果。
然后看下他的三個(gè)方法:①combineValuesByKey:實(shí)現(xiàn)的就是分區(qū)內(nèi)部的數(shù)據(jù)合并②combineCombinersByKey:主要是實(shí)現(xiàn)分區(qū)間的數(shù)據(jù)合并,也就是合并combineValuesByKey的結(jié)果③updateMetrics:刷磁盤有關(guān),就是記錄下,當(dāng)前是否刷了磁盤,刷了多少
回到combineByKeyWithClassTag方法中,?實(shí)例化Aggregator后,接著就是判斷,是否需要重新分區(qū)(shuffle)。然后self.partitioner == Some(partitioner)判斷分區(qū)器是否相同如果分區(qū)器相同,self.partitioner是指A這個(gè)RDD的partitioner,它指明了A這個(gè)RDD中的每個(gè)key在哪個(gè)partition中。而等號(hào)右邊的partitioner,指明了B這個(gè)RDD的每個(gè)key在哪個(gè)partition中。當(dāng)二者==時(shí),就會(huì)用self.mapPartitions生成MapPartitionsRDD, 這和map這種transformation生成的RDD是一樣的,此時(shí)reduceByKey不會(huì)引發(fā)shuffle。
①當(dāng)self.partitioner == Some(partitioner)時(shí),也就是分區(qū)實(shí)例是同一個(gè)的時(shí)候,就不需要分區(qū)了,因此只需要對(duì)先用的分區(qū)進(jìn)行combineValuesByKey操作就好了,沒有分區(qū)間的合并了,也不需要shuffle了。②兩個(gè)分區(qū)器不一樣,需要對(duì)現(xiàn)在分區(qū)的零散數(shù)據(jù)按Key重新分區(qū),目的就是在于將相同的Key匯集到同一個(gè)分區(qū)上,由于數(shù)據(jù)分布的不確定性,因此有可能現(xiàn)在的每個(gè)分區(qū)的數(shù)據(jù)是由重新分區(qū)后的所有分區(qū)的部分?jǐn)?shù)據(jù)構(gòu)成的(寬依賴),因此需要shuffle,則構(gòu)建ShuffledRDD
combineByKey的關(guān)鍵在于分區(qū)器partitioner,它是針對(duì)分區(qū)的一個(gè)操作,分區(qū)器的選擇就決定了執(zhí)行combineByKey后的結(jié)果,如果所給的分區(qū)器不能保證相同的Key值被分區(qū)到同一個(gè)分區(qū),那么最終的合并的結(jié)果可能存在多個(gè)分區(qū)里有相同的Key。Shuffle的目的就是將零散于所有分區(qū)的數(shù)據(jù)按Key分區(qū)并集中。
8.4 join
join就是sql中的inner join。join也是PairRDDFunctions中的方法,sparkcore中支持的連接有:笛卡爾積、內(nèi)連接join,外連接(左leftOuterJoin、右rightOuterJoin、全fullOuterJoin)
不指定分區(qū)函數(shù)時(shí)默認(rèn)使用HashPartitioner;提供numPartitions參數(shù)時(shí),其內(nèi)部的分區(qū)函數(shù)是HashPartitioner(numPartitions)
我們發(fā)現(xiàn)join的內(nèi)部其實(shí)是調(diào)用cogroup。即rdd1.join(rdd2) => rdd1.cogroup(rdd2,partitioner) => flatMapValues(遍歷兩個(gè)value的迭代器)。
返回值的是(key,(v1,v2))這種形式的元組
8.5 cogroup
首先先判斷一下如果使用HashPartitioner分區(qū),并且key是數(shù)組的話拋異常。然后構(gòu)造一個(gè)CoGroupedRDD其鍵值對(duì)中的value要求是Iterable[V]和Iterable[W]類型。
重寫的RDD的getDependencies: 如果rdd和給定分區(qū)函數(shù)相同就是窄依賴,否則就是寬依賴
這里返回一個(gè)帶有Partitioner.numPartitions個(gè)分區(qū)類型為CoGroupPartition的數(shù)組
總結(jié):cogroup算子,根據(jù)rdd1,rdd2創(chuàng)建了一個(gè)CoGroupedRDD;分析了CoGroupedRDD的依賴關(guān)系,看到如果兩個(gè)rdd的分區(qū)函數(shù)相同,那么生成的rdd分區(qū)數(shù)不變,它們之間是一對(duì)一依賴,也就是窄依賴,從而可以減少依次shuffle;CoGroupedRDD的分區(qū)函數(shù)就是將兩個(gè)rdd的相同分區(qū)索引的分區(qū)合成一個(gè)新的分區(qū),并且通過NarrowCoGroupSplitDep這個(gè)類實(shí)現(xiàn)了序列化。
join返回的類型是 RDD[(K, (V, W))],CoGroup返回的是RDD[(K, (Iterable[V], Iterable[W]))]
8.6 reduce
reduce函數(shù):對(duì)RDD中的所有元素進(jìn)行聚合操作,將最終的結(jié)果返回給Driver。同時(shí)元素之間還要符合結(jié)合律和交換律[原因:在進(jìn)行reduce的操作時(shí),并不知道那個(gè)數(shù)據(jù)先過來(lái),所有要符合交換律,在交換律的基礎(chǔ)上,滿足結(jié)合律才能進(jìn)行reduce]
8.7?collect
collect方法是匯總所有節(jié)點(diǎn)中的計(jì)算結(jié)果到Driver端,collect后得到的是數(shù)組,Array中就是一個(gè)元素,只不過這個(gè)元素是一個(gè)Tuple,Array即為元組數(shù)組。返回的是一個(gè)數(shù)組,包含了所有程序運(yùn)行結(jié)果的數(shù)組,其中使用concat(results:?_*)方法將各個(gè)節(jié)點(diǎn)的數(shù)據(jù)加入到數(shù)組中。
8.8?saveAsTextFile
該函數(shù)將數(shù)據(jù)輸出,以文本文件的形式寫入本地文件系統(tǒng)或者HDFS等。Spark將對(duì)每個(gè)元素調(diào)用toString方法,將數(shù)據(jù)元素轉(zhuǎn)換為文本文件中的一行記錄。若將文件保存到本地文件系統(tǒng),那么只會(huì)保存在executor所在機(jī)器的本地目錄
總結(jié)
以上是生活随笔為你收集整理的RDD(弹性分布式数据集)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql 混合模式,SQLServer
- 下一篇: c 语言输出后不关闭_穿书+娱乐圈 |再