Spark Core
Spark Core
?? ?DAG概念
?? ??? ?有向無(wú)環(huán)圖
?? ??? ?Spark會(huì)根據(jù)用戶提交的計(jì)算邏輯中的RDD的轉(zhuǎn)換(變換方法)和動(dòng)作(action方法)來(lái)生成RDD之間的依賴關(guān)系,同時(shí)這個(gè)計(jì)算鏈也就生成了邏輯上的DAG。
?? ??? ?RDD之間的關(guān)系可以從兩個(gè)維度來(lái)理解:一個(gè)是RDD是從哪些RDD轉(zhuǎn)換而來(lái),也就是RDD的parent?RDD(s)是什么;還有就是依賴于parent?RDD(s)的哪些Partition(s)。這個(gè)關(guān)系,就是RDD之間的依賴,org.apache.spark.Dependency。根據(jù)依賴于parent?RDD(s)的Partitions的不同情況,Spark將這種依賴分為兩種,一種是寬依賴,一種是窄依賴。
?? ?DAG的生成與Stage的劃分
?? ??? ?DAG的生成
?? ??? ??? ?原始的RDD(s)通過(guò)一系列轉(zhuǎn)換就形成了DAG。RDD之間的依賴關(guān)系,包含了RDD由哪些Parent?RDD(s)轉(zhuǎn)換而來(lái)和它依賴parent?RDD(s)的哪些Partitions,是DAG的重要屬性。
?? ??? ??? ?借助這些依賴關(guān)系,DAG可以認(rèn)為這些RDD之間形成了Lineage(血統(tǒng),血緣關(guān)系)。借助Lineage,能保證一個(gè)RDD被計(jì)算前,它所依賴的parent?RDD都已經(jīng)完成了計(jì)算;同時(shí)也實(shí)現(xiàn)了RDD的容錯(cuò)性,即如果一個(gè)RDD的部分或者全部的計(jì)算結(jié)果丟失了,那么就需要重新計(jì)算這部分丟失的數(shù)據(jù)。
?? ??? ?Spark的Stage(階段)
?? ??? ??? ?Spark在執(zhí)行任務(wù)(job)時(shí),首先會(huì)根據(jù)依賴關(guān)系,將DAG劃分為不同的階段(Stage)
?? ??? ??? ?處理流程是:
?? ??? ??? ??? ?1)Spark在執(zhí)行Transformation類型操作時(shí)都不會(huì)立即執(zhí)行,而是懶執(zhí)行(計(jì)算)
?? ??? ??? ??? ?2)執(zhí)行若干步的Transformation類型的操作后,一旦遇到Action類型操作時(shí),才會(huì)真正觸發(fā)執(zhí)行(計(jì)算)
?? ??? ??? ??? ?3)執(zhí)行時(shí),從當(dāng)前Action方法向前回溯,如果遇到的是窄依賴則應(yīng)用流水線優(yōu)化,繼續(xù)向前找,直到碰到某一個(gè)寬依賴
?? ??? ??? ??? ?4)因?yàn)閷捯蕾嚤仨氁M(jìn)行shuffle,無(wú)法實(shí)現(xiàn)優(yōu)化,所以將這一次段執(zhí)行過(guò)程組裝為一個(gè)stage
?? ??? ??? ??? ?5)再?gòu)漠?dāng)前寬依賴開(kāi)始繼續(xù)向前找。重復(fù)剛才的步驟,從而將這個(gè)DAG還分為若干的stage
?? ??? ??? ?在stage內(nèi)部可以執(zhí)行流水線優(yōu)化,而在stage之間沒(méi)辦法執(zhí)行流水線優(yōu)化,因?yàn)橛衧huffle。但是這種機(jī)制已經(jīng)盡力的去避免了shuffle
?? ??? ?Spark的Job和Task
?? ??? ??? ?原始的RDD經(jīng)過(guò)一系列轉(zhuǎn)換后(一個(gè)DAG),會(huì)在最后一個(gè)RDD上觸發(fā)一個(gè)動(dòng)作,這個(gè)動(dòng)作會(huì)生成一個(gè)Job。
?? ??? ??? ?所以可以這樣理解:一個(gè)DAG對(duì)應(yīng)一個(gè)Spark的Job。
?? ??? ??? ?在Job被劃分為一批計(jì)算任務(wù)(Task)后,這批Task會(huì)被提交到集群上的計(jì)算節(jié)點(diǎn)去計(jì)算Spark的Task分為兩種:
?? ??? ??? ??? ?1)org.apache.spark.scheduler.ShuffleMapTask
?? ??? ??? ??? ?2)org.apache.spark.scheduler.ResultTask
?? ??? ??? ?簡(jiǎn)單來(lái)說(shuō),DAG的最后一個(gè)階段會(huì)為每個(gè)結(jié)果的Partition生成一個(gè)ResultTask,其余所有的階段都會(huì)生成ShufffleMapTask。
?? ?RDD
?? ??? ?RDD就是帶有分區(qū)的集合類型
?? ??? ??? ?RDD是分布式的,彈性的,容錯(cuò)的數(shù)據(jù)結(jié)構(gòu)
?? ??? ??? ?彈性分布式數(shù)據(jù)集(RDD),特點(diǎn)是可以并行操作,并且是容錯(cuò)的。有兩種方法可以創(chuàng)建RDD:
?? ??? ??? ??? ?1)執(zhí)行Transform操作(變換操作),
?? ??? ??? ??? ?2)讀取外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集,如HDFS,HBase,或任何與Hadoop有關(guān)的數(shù)據(jù)源。
?? ??? ??? ??? ?注:創(chuàng)建RDD的方式有多種,比如案例一中是基于一個(gè)基本的集合類型(Array)轉(zhuǎn)換而來(lái),像parallelize這樣的方法還有很多此外,我們也可以在讀取數(shù)據(jù)集時(shí)就創(chuàng)建RDD。
?? ??? ??? ?分區(qū)概念
?? ??? ??? ??? ?可以在不同的機(jī)器上并行處理
?? ??? ??? ?它是spark提供的一個(gè)特殊集合類。諸如普通的集合類型,如傳統(tǒng)的Array:(1,2,3,4,5)是一個(gè)整體,但轉(zhuǎn)換成RDD后,我們可以對(duì)數(shù)據(jù)進(jìn)行Partition(分區(qū))處理,這樣做的目的就是為了分布式。
?? ??? ??? ??? ?你可以讓這個(gè)RDD有兩個(gè)分區(qū),那么有可能是這個(gè)形式:RDD(1,2) (3,4)。
?? ??? ??? ??? ?這樣設(shè)計(jì)的目的在于:可以進(jìn)行分布式運(yùn)算。
?? ??? ?RDD操作
?? ??? ??? ?針對(duì)RDD的操作,分兩種,一種是Transformation(變換),一種是Actions(執(zhí)行)。
?? ??? ??? ?Transformation(變換)操作屬于懶操作(算子),不會(huì)真正觸發(fā)RDD的處理計(jì)算。
?? ??? ??? ?變換方法的共同點(diǎn):1.不會(huì)馬上觸發(fā)計(jì)算 2.每當(dāng)調(diào)用一次變換方法,都會(huì)產(chǎn)生一個(gè)新的RDD,Actions(執(zhí)行)操作才會(huì)真正觸發(fā)。
?? ??? ?RDD的依賴關(guān)系
?? ??? ??? ?RDD和它依賴的parent?RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow?dependency)和寬依賴(wide?dependency)。
?? ??? ??? ?1)窄依賴指的是每一個(gè)parent?RDD的Partition最多被子RDD的一個(gè)Partition使用
?? ??? ??? ??? ?對(duì)于窄依賴操作,它們只是將Partition的數(shù)據(jù)根據(jù)轉(zhuǎn)換的規(guī)則進(jìn)行轉(zhuǎn)化,并不涉及其他的處理,可以簡(jiǎn)單地認(rèn)為只是將數(shù)據(jù)從一個(gè)形式轉(zhuǎn)換到另一個(gè)形式。
?? ??? ??? ??? ?所以對(duì)于窄依賴,并不會(huì)引入昂貴的Shuffle。所以執(zhí)行效率非常高。如果整個(gè)DAG中存在多個(gè)連續(xù)的窄依賴,則可以將這些連續(xù)的窄依賴整合到一起連續(xù)執(zhí)行,中間不執(zhí)行shuffle 從而提高效率,這樣的優(yōu)化方式稱之為流水線優(yōu)化。
?? ??? ??? ??? ?此外,針對(duì)窄依賴,如果子RDD某個(gè)分區(qū)數(shù)據(jù)丟失,只需要找到父RDD對(duì)應(yīng)依賴的分區(qū),恢復(fù)即可。但如果是寬依賴,當(dāng)分區(qū)丟失時(shí),最糟糕的情況是要重算所有父RDD的所有分區(qū)。
?? ??? ??? ?2)寬依賴指的是多個(gè)子RDD的Partition會(huì)依賴同一個(gè)parent?RDD的Partition。
?? ??? ??? ??? ?對(duì)于groupByKey這樣的操作,子RDD的所有Partition(s)會(huì)依賴于parent?RDD的所有Partition(s),子RDD的Partition是parent?RDD的所有Partition?Shuffle的結(jié)果。
?? ??? ??? ?Shuffle概述
?? ??? ??? ??? ?spark中一旦遇到寬依賴就需要進(jìn)行shuffle的操作,所謂的shuffle的操作的本質(zhì)就是將數(shù)據(jù)匯總后重新分發(fā)的過(guò)程
?? ??? ??? ??? ?這個(gè)過(guò)程數(shù)據(jù)要匯總到一起,數(shù)據(jù)量可能很大所以不可避免的需要進(jìn)行數(shù)據(jù)落磁盤的操作,會(huì)降低程序的性能,所以spark并不是完全內(nèi)存不讀寫(xiě)磁盤,只能說(shuō)它盡力避免這樣的過(guò)程來(lái)提高效率 。
?? ??? ??? ??? ?spark中的shuffle,在早期的版本中,會(huì)產(chǎn)生多個(gè)臨時(shí)文件,但是這種多臨時(shí)文件的策略造成大量文件的同時(shí)的讀寫(xiě),磁盤的性能被分?jǐn)偨o多個(gè)文件,每個(gè)文件讀寫(xiě)效率都不高,影響spark的執(zhí)行效率。所以在后續(xù)的spark中(1.2.0之后的版本)的shuffle中,只會(huì)產(chǎn)生一個(gè)文件,并且數(shù)據(jù)會(huì)經(jīng)過(guò)排序再附加索引信息,減少了文件的數(shù)量并通過(guò)排序索引的方式提升了性能。
?? ??? ?RDD容錯(cuò)機(jī)制
?? ??? ??? ?分布式系統(tǒng)通常在一個(gè)機(jī)器集群上運(yùn)行,同時(shí)運(yùn)行的幾百臺(tái)機(jī)器中某些出問(wèn)題的概率大大增加,所以容錯(cuò)設(shè)計(jì)是分布式系統(tǒng)的一個(gè)重要能力。
?? ??? ??? ?Spark以前的集群容錯(cuò)處理模型,像MapReduce,將計(jì)算轉(zhuǎn)換為一個(gè)有向無(wú)環(huán)圖(DAG)的任務(wù)集合,這樣可以通過(guò)重復(fù)執(zhí)行DAG里的一部分任務(wù)來(lái)完成容錯(cuò)恢復(fù)。但是由于主要的數(shù)據(jù)存儲(chǔ)在分布式文件系統(tǒng)中,沒(méi)有提供其他存儲(chǔ)的概念,容錯(cuò)過(guò)程需要在網(wǎng)絡(luò)上進(jìn)行數(shù)據(jù)復(fù)制,從而增加了大量的消耗。所以,分布式編程中經(jīng)常需要做檢查點(diǎn),即將某個(gè)時(shí)機(jī)的中間數(shù)據(jù)寫(xiě)到存儲(chǔ)(通常是分布式文件系統(tǒng))中。
?? ??? ??? ?RDD也是一個(gè)DAG,每一個(gè)RDD都會(huì)記住創(chuàng)建該數(shù)據(jù)集需要哪些操作,跟蹤記錄RDD的繼承關(guān)系,這個(gè)關(guān)系在Spark里面叫l(wèi)ineage(血緣關(guān)系)。當(dāng)一個(gè)RDD的某個(gè)分區(qū)丟失時(shí),RDD是有足夠的信息記錄其如何通過(guò)其他RDD進(jìn)行計(jì)算,且只需重新計(jì)算該分區(qū),這是Spark的一個(gè)創(chuàng)新。
?? ??? ?RDD的緩存
?? ??? ??? ?相比Hadoop?MapReduce來(lái)說(shuō),Spark計(jì)算具有巨大的性能優(yōu)勢(shì),其中很大一部分原因是Spark對(duì)于內(nèi)存的充分利用,以及提供的緩存機(jī)制
?? ??? ??? ?RDD持久化(緩存)
?? ??? ??? ??? ?持久化在早期被稱作緩存(cache),但緩存一般指將內(nèi)容放在內(nèi)存中。雖然持久化操作在絕大部分情況下都是將RDD緩存在內(nèi)存中,但一般都會(huì)在內(nèi)存不夠時(shí)用磁盤頂上去(比操作系統(tǒng)默認(rèn)的磁盤交換性能高很多)。當(dāng)然,也可以選擇不使用內(nèi)存,而是僅僅保存到磁盤中。所以,現(xiàn)在Spark使用持久化(persistence)這一更廣泛的名稱。
?? ??? ??? ?默認(rèn)情況下,RDD只使用一次,用完即扔,再次使用時(shí)需要重新計(jì)算得到,而持久化(緩存)操作避免了這里的重復(fù)計(jì)算,實(shí)際測(cè)試也顯示持久化對(duì)性能提升明顯,這也是Spark剛出現(xiàn)時(shí)被人稱為內(nèi)存計(jì)算框架的原因。
?? ??? ??? ?持久化的方法是調(diào)用persist()函數(shù),除了持久化至內(nèi)存中,還可以在persist()中指定storage?level參數(shù)使用其他的類型,具體如下:
?? ??? ??? ??? ?1)MEMORY_ONLY : 將 RDD 以反序列化的 Java 對(duì)象的形式存儲(chǔ)在 JVM 中. 如果內(nèi)存空間不夠,部分?jǐn)?shù)據(jù)分區(qū)將不會(huì)被緩存,在每次需要用到這些數(shù)據(jù)時(shí)重新進(jìn)行計(jì)算. 這是默認(rèn)的級(jí)別。
?? ??? ??? ??? ?cache()方法對(duì)應(yīng)的級(jí)別就是MEMORY_ONLY級(jí)別
?? ??? ??? ??? ?2)MEMORY_AND_DISK:將 RDD 以反序列化的 Java 對(duì)象的形式存儲(chǔ)在 JVM 中。如果內(nèi)存空間不夠,將未緩存的數(shù)據(jù)分區(qū)存儲(chǔ)到磁盤,在需要使用這些分區(qū)時(shí)從磁盤讀取。
?? ??? ??? ??? ?3)MEMORY_ONLY_SER :將 RDD 以序列化的 Java 對(duì)象的形式進(jìn)行存儲(chǔ)(每個(gè)分區(qū)為一個(gè) byte 數(shù)組)。這種方式會(huì)比反序列化對(duì)象的方式節(jié)省很多空間,尤其是在使用 fast serialize時(shí)會(huì)節(jié)省更多的空間,但是在讀取時(shí)會(huì)使得 CPU 的 read 變得更加密集。如果內(nèi)存空間不夠,部分?jǐn)?shù)據(jù)分區(qū)將不會(huì)被緩存,在每次需要用到這些數(shù)據(jù)時(shí)重新進(jìn)行計(jì)算。
?? ??? ??? ??? ?4)MEMORY_AND_DISK_SER :類似于 MEMORY_ONLY_SER ,但是溢出的分區(qū)會(huì)存儲(chǔ)到磁盤,而不是在用到它們時(shí)重新計(jì)算。如果內(nèi)存空間不夠,將未緩存的數(shù)據(jù)分區(qū)存儲(chǔ)到磁盤,在需要使用這些分區(qū)時(shí)從磁盤讀取。
?? ??? ??? ??? ?5)DISK_ONLY:只在磁盤上緩存 RDD。
?? ??? ??? ??? ?6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :與上面的級(jí)別功能相同,只不過(guò)每個(gè)分區(qū)在集群中兩個(gè)節(jié)點(diǎn)上建立副本。
?? ??? ??? ??? ?7)OFF_HEAP 將數(shù)據(jù)存儲(chǔ)在 off-heap memory 中。使用堆外內(nèi)存,這是Java虛擬機(jī)里面的概念,堆外內(nèi)存意味著把內(nèi)存對(duì)象分配在Java虛擬機(jī)的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是虛擬機(jī))。使用堆外內(nèi)存的好處:可能會(huì)利用到更大的內(nèi)存存儲(chǔ)空間。但是對(duì)于數(shù)據(jù)的垃圾回收會(huì)有影響,需要程序員來(lái)處理
?? ??? ??? ??? ?注意,可能帶來(lái)一些GC回收問(wèn)題。
?? ??? ??? ?緩存數(shù)據(jù)的清除
?? ??? ??? ??? ?Spark?會(huì)自動(dòng)監(jiān)控每個(gè)節(jié)點(diǎn)上的緩存數(shù)據(jù),然后使用?least-recently-used?(LRU)?機(jī)制來(lái)處理舊的緩存數(shù)據(jù)。如果你想手動(dòng)清理這些緩存的?RDD?數(shù)據(jù)而不是去等待它們被自動(dòng)清理掉,
?? ??? ??? ??? ?可以使用?RDD.unpersist(?)?方法。
?? ??? ??? ?Spark?也會(huì)自動(dòng)持久化一些在?shuffle?操作過(guò)程中產(chǎn)生的臨時(shí)數(shù)據(jù)(比如?reduceByKey),即便是用戶并沒(méi)有調(diào)用持久化的方法。這樣做可以避免當(dāng)?shuffle?階段時(shí)如果一個(gè)節(jié)點(diǎn)掛掉了就得重新計(jì)算整個(gè)數(shù)據(jù)的問(wèn)題。如果用戶打算多次重復(fù)使用這些數(shù)據(jù),我們?nèi)匀唤ㄗh用戶自己調(diào)用持久化方法對(duì)數(shù)據(jù)進(jìn)行持久化。
?? ?Spark框架核心概念
?? ??? ?1.RDD。彈性分布式數(shù)據(jù)集,是Spark最核心的數(shù)據(jù)結(jié)構(gòu)。有分區(qū)機(jī)制,所以可以分布式進(jìn)行處理。有容錯(cuò)機(jī)制,通過(guò)RDD之間的依賴關(guān)系來(lái)恢復(fù)數(shù)據(jù)。
?? ??? ?2.依賴關(guān)系。RDD的依賴關(guān)系是通過(guò)各種Transformation(變換)來(lái)得到的。父RDD和子RDD之間的依賴關(guān)系分兩種:①窄依賴? ②寬依賴
?? ??? ??? ?①針對(duì)窄依賴:父RDD的分區(qū)和子RDD的分區(qū)關(guān)系是:一對(duì)一
?? ??? ??? ?窄依賴不會(huì)發(fā)生Shuffle,執(zhí)行效率高,spark框架底層會(huì)針對(duì)多個(gè)連續(xù)的窄依賴執(zhí)行流水線優(yōu)化,從而提高性能。例如 map? flatMap等方法都是窄依賴方法
?? ??? ??? ?②針對(duì)寬依賴:父RDD的分區(qū)和子RDD的分區(qū)關(guān)系是:一對(duì)多
?? ??? ??? ?寬依賴會(huì)產(chǎn)生shuffle,會(huì)產(chǎn)生磁盤讀寫(xiě),無(wú)法優(yōu)化。
?? ??? ?3.DAG。有向無(wú)環(huán)圖,當(dāng)一整條RDD的依賴關(guān)系形成之后,就形成了一個(gè)DAG。一般來(lái)說(shuō),一個(gè)DAG,最后都至少會(huì)觸發(fā)一個(gè)Action操作,觸發(fā)執(zhí)行。一個(gè)Action對(duì)應(yīng)一個(gè)Job任務(wù)。
?? ??? ?4.Stage。一個(gè)DAG會(huì)根據(jù)RDD之間的依賴關(guān)系進(jìn)行Stage劃分,流程是:以Action為基準(zhǔn),向前回溯,遇到寬依賴,就形成一個(gè)Stage。遇到窄依賴,則執(zhí)行流水線優(yōu)化(將多個(gè)連續(xù)的窄依賴放到一起執(zhí)行)
?? ??? ?5.task。任務(wù)。一個(gè)分區(qū)對(duì)應(yīng)一個(gè)task??梢赃@樣理解:一個(gè)Stage是一組Task的集合
?? ??? ?6.RDD的Transformation(變換)操作:懶執(zhí)行,并不會(huì)立即執(zhí)行
?? ??? ?7.RDD的Action(執(zhí)行)操作:觸發(fā)真正的執(zhí)行
?? ?Spark Shuffle詳解
?? ??? ?Shuffle,翻譯成中文就是洗牌。之所以需要Shuffle,還是因?yàn)榫哂心撤N共同特征的一類數(shù)據(jù)需要最終匯聚(aggregate)到一個(gè)計(jì)算節(jié)點(diǎn)上進(jìn)行計(jì)算。這些數(shù)據(jù)分布在各個(gè)存儲(chǔ)節(jié)點(diǎn)上并且由不同節(jié)點(diǎn)的計(jì)算單元處理。
?? ??? ?數(shù)據(jù)重新打亂然后匯聚到不同節(jié)點(diǎn)的過(guò)程就是Shuffle。但是實(shí)際上,Shuffle過(guò)程可能會(huì)非常復(fù)雜:
?? ??? ??? ?1)數(shù)據(jù)量會(huì)很大,比如單位為TB或PB的數(shù)據(jù)分散到幾百甚至數(shù)千、數(shù)萬(wàn)臺(tái)機(jī)器上。
?? ??? ??? ?2)為了將這個(gè)數(shù)據(jù)匯聚到正確的節(jié)點(diǎn),需要將這些數(shù)據(jù)放入正確的Partition,因?yàn)閿?shù)據(jù)大小已經(jīng)大于節(jié)點(diǎn)的內(nèi)存,因此這個(gè)過(guò)程中可能會(huì)發(fā)生多次硬盤續(xù)寫(xiě)。
?? ??? ??? ?3)為了節(jié)省帶寬,這個(gè)數(shù)據(jù)可能需要壓縮,如何在壓縮率和壓縮解壓時(shí)間中間做一個(gè)比較好的選擇?
?? ??? ??? ?4)數(shù)據(jù)需要通過(guò)網(wǎng)絡(luò)傳輸,因此數(shù)據(jù)的序列化和反序列化也變得相對(duì)復(fù)雜。
?? ??? ??? ?一般來(lái)說(shuō),每個(gè)Task處理的數(shù)據(jù)可以完全載入內(nèi)存(如果不能,可以減小每個(gè)Partition的大小),因此Task可以做到在內(nèi)存中計(jì)算。但是對(duì)于Shuffle來(lái)說(shuō),如果不持久化這個(gè)中間結(jié)果,一旦數(shù)據(jù)丟失,就需要重新計(jì)算依賴的全部RDD,因此有必要持久化這個(gè)中間結(jié)果。所以這就是為什么Shuffle過(guò)程會(huì)產(chǎn)生文件的原因。
?? ??? ??? ?如果Shuffle過(guò)程不落地,①可能會(huì)造成內(nèi)存溢出 ②當(dāng)某分區(qū)丟失時(shí),會(huì)重新計(jì)算所有父分區(qū)數(shù)據(jù)
?? ??? ?Shuffle Write
?? ??? ??? ?Shuffle?Write,即數(shù)據(jù)是如何持久化到文件中,以使得下游的Task可以獲取到其需要處理的數(shù)據(jù)的(即Shuffle?Read)。在Spark 0.8之前,Shuffle Write是持久化到緩存的,但后來(lái)發(fā)現(xiàn)實(shí)際應(yīng)用中,shuffle過(guò)程帶來(lái)的數(shù)據(jù)通常是巨量的,所以經(jīng)常會(huì)發(fā)生內(nèi)存溢出的情況,所以在Spark?0.8以后,Shuffle?Write會(huì)將數(shù)據(jù)持久化到硬盤,再之后Shuffle?Write不斷進(jìn)行演進(jìn)優(yōu)化,但是數(shù)據(jù)落地到本地文件系統(tǒng)的實(shí)現(xiàn)并沒(méi)有改變。
?? ??? ??? ?1)Hash?Based?Shuffle?Write
?? ??? ??? ??? ?在Spark?1.0以前,Spark只支持Hash?Based?Shuffle。因?yàn)樵诤芏噙\(yùn)算場(chǎng)景中并不需要排序,因此多余的排序只能使性能變差,比如Hadoop的Map?Reduce就是這么實(shí)現(xiàn)的,也就是Reducer拿到的數(shù)據(jù)都是已經(jīng)排好序的。實(shí)際上Spark的實(shí)現(xiàn)很簡(jiǎn)單:每個(gè)Shuffle?Map?Task根據(jù)key的哈希值,計(jì)算出每個(gè)key需要寫(xiě)入的Partition然后將數(shù)據(jù)單獨(dú)寫(xiě)入一個(gè)文件,這個(gè)Partition實(shí)際上就對(duì)應(yīng)了下游的一個(gè)Shuffle?Map?Task或者Result?Task。因此下游的Task在計(jì)算時(shí)會(huì)通過(guò)網(wǎng)絡(luò)(如果該Task與上游的Shuffle?Map?Task運(yùn)行在同一個(gè)節(jié)點(diǎn)上,那么此時(shí)就是一個(gè)本地的硬盤讀寫(xiě))讀取這個(gè)文件并進(jìn)行計(jì)算。
?? ??? ??? ??? ?Hash?Based?Shuffle?Write存在的問(wèn)題
?? ??? ??? ??? ??? ?1)每個(gè)節(jié)點(diǎn)可能會(huì)同時(shí)打開(kāi)多個(gè)文件,每次打開(kāi)文件都會(huì)占用一定內(nèi)存。假設(shè)每個(gè)Write?Handler的默認(rèn)需要100KB的內(nèi)存,那么同時(shí)打開(kāi)這些文件需要50GB的內(nèi)存,對(duì)于一個(gè)集群來(lái)說(shuō),還是有一定的壓力的。尤其是如果Shuffle?Map?Task和下游的Task同時(shí)增大10倍,那么整體的內(nèi)存就增長(zhǎng)到5TB。
?? ??? ??? ??? ??? ?2)從整體的角度來(lái)看,打開(kāi)多個(gè)文件對(duì)于系統(tǒng)來(lái)說(shuō)意味著隨機(jī)讀,尤其是每個(gè)文件比較小但是數(shù)量非常多的情況。而現(xiàn)在機(jī)械硬盤在隨機(jī)讀方面的性能特別差,非常容易成為性能的瓶頸。如果集群依賴的是固態(tài)硬盤,也許情況會(huì)改善很多,但是隨機(jī)寫(xiě)的性能肯定不如順序?qū)懙摹?br />?? ??? ??? ??? ?Hash?Based?Shuffle的每個(gè)Mapper都需要為每個(gè)Reducer寫(xiě)一個(gè)文件,供Reducer讀取,即需要產(chǎn)生M*R個(gè)數(shù)量的文件,如果Mapper和Reducer的數(shù)量比較大,產(chǎn)生的文件數(shù)會(huì)非常多。
?? ??? ??? ?2)Sort?Based Shuffle?Write
?? ??? ??? ??? ?Spark?Core的一個(gè)重要的升級(jí)就是將默認(rèn)的Hash?Based?Shuffle換成了Sort?Based?Shuffle,即spark.shuffle.manager從Hash換成了Sort
?? ??? ??? ??? ?對(duì)應(yīng)的實(shí)現(xiàn)類分別是
?? ??? ??? ??? ??? ?org.apache.spark.shuffle.hash.HashShuffleManager
?? ??? ??? ??? ??? ?org.apache.spark.shuffle.sort.SortShuffleManager。
?? ??? ??? ??? ?Sort?Based?Shuffle的模式是:每個(gè)Shuffle?Map?Task不會(huì)為每個(gè)Reducer生成一個(gè)單獨(dú)的文件;相反,它會(huì)將所有的結(jié)果寫(xiě)到一個(gè)文件里,同時(shí)會(huì)生成一個(gè)Index文件,
?? ??? ??? ??? ?Reducer可以通過(guò)這個(gè)Index文件取得它需要處理的數(shù)據(jù)。避免產(chǎn)生大量文件的直接收益就是節(jié)省了內(nèi)存的使用和順序Disk?IO帶來(lái)的低延時(shí)。節(jié)省內(nèi)存的使用可以減少GC的風(fēng)險(xiǎn)和頻率。而減少文件的數(shù)量可以避免同時(shí)寫(xiě)多個(gè)文件給系統(tǒng)帶來(lái)的壓力。
?? ??? ??? ??? ?Sort?Based?Write實(shí)現(xiàn)詳解
?? ??? ??? ??? ??? ?Shuffle?Map?Task會(huì)按照key相對(duì)應(yīng)的Partition?ID進(jìn)行Sort,其中屬于同一個(gè)Partition的key不會(huì)Sort。因?yàn)閷?duì)于不需要Sort的操作來(lái)說(shuō),這個(gè)Sort是負(fù)收益的;要知道之前Spark剛開(kāi)始使用Hash?Based的Shuffle而不是Sort?Based就是為了避免Hadoop?Map?Reduce對(duì)于所有計(jì)算都會(huì)Sort的性能損耗。對(duì)于那些需要Sort的運(yùn)算,
?? ??? ??? ??? ??? ?比如sortByKey,這個(gè)Sort在Spark?1.2.0里還是由Reducer完成的。
?? ??? ??? ??? ??? ?①答出shuffle的定義
?? ??? ??? ??? ??? ?②spark shuffle的特點(diǎn)
?? ??? ??? ??? ??? ?③spark shuffle的目的
?? ??? ??? ??? ??? ?④spark shuffel的實(shí)現(xiàn)類,即對(duì)應(yīng)優(yōu)缺點(diǎn)
?? ??? ?Shuffle 相關(guān)參數(shù)配置
?? ??? ??? ?Shuffle是Spark?Core比較復(fù)雜的模塊,它也是非常影響性能的操作之一。
?? ??? ??? ?1)spark.shuffle.manager
?? ??? ??? ??? ?兩種方式的Shuffle 即Hash?Based?Shuffle和Sort?Based?Shuffle
?? ??? ??? ?2)spark.shuffle.spill
?? ??? ??? ??? ?這個(gè)參數(shù)的默認(rèn)值是true,用于指定Shuffle過(guò)程中如果內(nèi)存中的數(shù)據(jù)超過(guò)閾值(參考spark.shuffle.memoryFraction的設(shè)置)時(shí)是否需要將部分?jǐn)?shù)據(jù)臨時(shí)寫(xiě)入外部存儲(chǔ)。
?? ??? ??? ??? ?如果設(shè)置為false,那么這個(gè)過(guò)程就會(huì)一直使用內(nèi)存,會(huì)有內(nèi)存溢出的風(fēng)險(xiǎn)。因此只有在確定內(nèi)存足夠使用時(shí),才可以將這個(gè)選項(xiàng)設(shè)置為false。
?? ??? ??? ?3)spark.shuffle.memoryFraction
?? ??? ??? ??? ?在啟用spark.shuffle.spill的情況下,spark.shuffle.memoryFraction決定了當(dāng)Shuffle過(guò)程中使用的內(nèi)存達(dá)到總內(nèi)存多少比例的時(shí)候開(kāi)始spill。在Spark?1.2.0里,這個(gè)值是0.2
?? ??? ??? ??? ?此參數(shù)可以適當(dāng)調(diào)大,可以控制在0.4~0.6。
?? ??? ??? ??? ?通過(guò)這個(gè)參數(shù)可以設(shè)置Shuffle過(guò)程占用內(nèi)存的大小,它直接影響了寫(xiě)入到外部存儲(chǔ)的頻率和垃圾回收的頻率。
?? ??? ??? ??? ?可以適當(dāng)調(diào)大此值,可以減少磁盤I/O次數(shù)。
?? ??? ??? ?4)spark.shuffle.blockTransferService
?? ??? ??? ??? ?在Spark?1.2.0中這個(gè)配置的默認(rèn)值是netty,而在之前的版本中是nio。它主要是用于在各個(gè)Executor之間傳輸Shuffle數(shù)據(jù)。netty的實(shí)現(xiàn)更加簡(jiǎn)潔,但實(shí)際上用戶不用太關(guān)心這個(gè)選項(xiàng)。除非有特殊需求,否則采用默認(rèn)配置即可。
?? ??? ??? ?5)spark.shuffle.consolidateFiles
?? ??? ??? ??? ?這個(gè)配置的默認(rèn)值是false。主要是為了解決在Hash?Based?Shuffle過(guò)程中產(chǎn)生過(guò)多文件的問(wèn)題。如果配置選項(xiàng)為true,那么對(duì)于同一個(gè)Core上運(yùn)行的Shuffle?Map?Task不會(huì)產(chǎn)生一個(gè)新的Shuffle文件而是重用原來(lái)的
?? ??? ??? ?6)spark.shuffle.compress和spark.shuffle.spill.compress
?? ??? ??? ??? ?這兩個(gè)參數(shù)的默認(rèn)配置都是true。都是用來(lái)設(shè)置Shuffle過(guò)程中是否對(duì)Shuffle數(shù)據(jù)進(jìn)行壓縮
?? ??? ??? ??? ?前者針對(duì)最終寫(xiě)入本地文件系統(tǒng)的輸出文件
?? ??? ??? ??? ?后者針對(duì)在處理過(guò)程需要寫(xiě)入到外部存儲(chǔ)的中間數(shù)據(jù),即針對(duì)最終的shuffle輸出文件。
?? ??? ??? ?7)spark.reducer.maxMbInFlight
?? ??? ??? ??? ?這個(gè)參數(shù)用于限制一個(gè)Result?Task向其他的Executor請(qǐng)求Shuffle數(shù)據(jù)時(shí)所占用的最大內(nèi)存數(shù),默認(rèn)是64MB。尤其是如果網(wǎng)卡是千兆和千兆以下的網(wǎng)卡時(shí)。默認(rèn)值是 設(shè)置這個(gè)值需要綜合考慮網(wǎng)卡帶寬和內(nèi)存。
?? ?Spark調(diào)優(yōu)
?? ??? ?更好的序列化實(shí)現(xiàn)
?? ??? ??? ?Spark用到序列化的地方
?? ??? ??? ??? ?1)Shuffle時(shí)需要將對(duì)象寫(xiě)入到外部的臨時(shí)文件。
?? ??? ??? ??? ?2)每個(gè)Partition中的數(shù)據(jù)要發(fā)送到worker上,spark先把RDD包裝成task對(duì)象,將task通過(guò)網(wǎng)絡(luò)發(fā)給worker。
?? ??? ??? ??? ?3)RDD如果支持內(nèi)存+硬盤,只要往硬盤中寫(xiě)數(shù)據(jù)也會(huì)涉及序列化。
?? ??? ??? ?默認(rèn)使用的是java的序列化。但java的序列化有兩個(gè)問(wèn)題,一個(gè)是性能相對(duì)比較低,另外它序列化完二進(jìn)制的內(nèi)容長(zhǎng)度也比較大,造成網(wǎng)絡(luò)傳輸時(shí)間拉長(zhǎng)。業(yè)界現(xiàn)在有很多更好的實(shí)現(xiàn),如kryo,比java的序列化快10倍以上。而且生成內(nèi)容長(zhǎng)度也短。時(shí)間快,空間小,自然選擇它了。
?? ??? ?通過(guò)代碼使用Kryo
?? ??? ?配置多臨時(shí)文件目錄
?? ??? ??? ?spark.local.dir參數(shù)。當(dāng)shuffle、歸并排序(sort、merge)時(shí)都會(huì)產(chǎn)生臨時(shí)文件。這些臨時(shí)文件都在這個(gè)指定的目錄下。那這個(gè)文件夾有很多臨時(shí)文件,如果都發(fā)生讀寫(xiě)操作,有的線程在讀這個(gè)文件,有的線程在往這個(gè)文件里寫(xiě),磁盤I/O性能就非常低。
?? ??? ??? ?可以創(chuàng)建多個(gè)文件夾,每個(gè)文件夾都對(duì)應(yīng)一個(gè)真實(shí)的硬盤。假如原來(lái)是3個(gè)程序同時(shí)讀寫(xiě)一個(gè)硬盤,效率肯定低,現(xiàn)在讓三個(gè)程序分別讀取3個(gè)磁盤,這樣沖突減少,效率就提高了。這樣就有效提高外部文件讀和寫(xiě)的效率。怎么配置呢?只需要在這個(gè)配置時(shí)配置多個(gè)路徑就可以。中間用逗號(hào)分隔。
?? ??? ??? ?spark.local.dir=/home/tmp,/home/tmp2
?? ??? ?啟用推測(cè)執(zhí)行機(jī)制
?? ??? ??? ?可以設(shè)置spark.speculation? true
?? ??? ??? ?開(kāi)啟后,spark會(huì)檢測(cè)執(zhí)行較慢的Task,并復(fù)制這個(gè)Task在其他節(jié)點(diǎn)運(yùn)行,最后哪個(gè)節(jié)點(diǎn)先運(yùn)行完,就用其結(jié)果,然后將慢Task 殺死
?? ??? ?collect速度慢
?? ??? ??? ?collect只適合在測(cè)試時(shí),因?yàn)榘呀Y(jié)果都收集到Driver服務(wù)器上,數(shù)據(jù)要跨網(wǎng)絡(luò)傳輸,同時(shí)要求Driver服務(wù)器內(nèi)存大,所以收集過(guò)程慢。解決辦法就是直接輸出到分布式文件系統(tǒng)中。
?? ??? ?有些情況下,RDD操作使用MapPartitions替代map
?? ??? ??? ?map方法對(duì)RDD的每一條記錄逐一操作。mapPartitions是對(duì)RDD里的每個(gè)分區(qū)操作
?? ??? ??? ?rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}
?? ??? ??? ?這樣頻繁的鏈接、斷開(kāi)數(shù)據(jù)庫(kù),效率差。
?? ??? ??? ?rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}
?? ??? ??? ?這樣就一次鏈接一次斷開(kāi),中間批量操作,效率提升。
?? ??? ?Spark的GC調(diào)優(yōu)
?? ??? ??? ?由于Spark立足于內(nèi)存計(jì)算,常常需要在內(nèi)存中存放大量數(shù)據(jù),因此也更依賴JVM的垃圾回收機(jī)制(GC)。并且同時(shí),它也支持兼容批處理和流式處理,對(duì)于程序吞吐量和延遲都有較高要求,因此GC參數(shù)的調(diào)優(yōu)在Spark應(yīng)用實(shí)踐中顯得尤為重要。
?? ??? ??? ?主要有兩種策略——Parallel?GC(吞吐量?jī)?yōu)先)和CMS?GC(低延遲響應(yīng))。
?? ??? ??? ?GC算法原理
?? ??? ??? ??? ?對(duì)于內(nèi)存較大的環(huán)境非常友好。因?yàn)镚1 GC對(duì)于內(nèi)存的使用率特別高,內(nèi)存越大,此優(yōu)勢(shì)越明顯。
?? ??? ??? ?選擇垃圾收集器
?? ??? ??? ??? ?park默認(rèn)使用的是Parallel?GC。經(jīng)調(diào)研我們發(fā)現(xiàn),Parallel?GC常常受困于Full?GC,而每次Full?GC都給性能帶來(lái)了較大的下降。而Parallel?GC可以進(jìn)行參數(shù)調(diào)優(yōu)的空間也非常有限,我們只能通過(guò)調(diào)節(jié)一些基本參數(shù)來(lái)提高性能,如各年代分區(qū)大小比例、進(jìn)入老年代前的拷貝次數(shù)等。而且這些調(diào)優(yōu)策略只能推遲Full?GC的到來(lái),如果是長(zhǎng)期運(yùn)行的應(yīng)用,Parallel?GC調(diào)優(yōu)的意義就非常有限了。
?? ??? ??? ?將InitiatingHeapOccupancyPercent參數(shù)調(diào)低(默認(rèn)值是45),可以使G1 GC收集器更早開(kāi)始Mixed GC(Minor GC);但另一方面,會(huì)增加GC發(fā)生頻率。(啟動(dòng)并發(fā)GC周期時(shí)的堆內(nèi)存占用百分比. G1之類的垃圾收集器用它來(lái)觸發(fā)并發(fā)GC周期,基于整個(gè)堆的使用率,而不只是某一代內(nèi)存的使用比. 值為 0 則表示"一直執(zhí)行GC循環(huán)". 默認(rèn)值為 45.)降低此值,會(huì)提高M(jìn)inor GC的頻率,但是會(huì)推遲Full GC的到來(lái)。
?? ??? ??? ?提高ConcGCThreads的值,在Mixed GC階段投入更多的并發(fā)線程,爭(zhēng)取提高每次暫停的效率。但是此參數(shù)會(huì)占用一定的有效工作線程資源。
?? ??? ??? ?調(diào)試這兩個(gè)參數(shù)可以有效降低Full GC出現(xiàn)的概率。Full GC被消除之后,最終的性能獲得了大幅提升。
?? ??? ?Spark的內(nèi)存管理
?? ??? ??? ?Spark的核心概念是RDD,實(shí)際運(yùn)行中內(nèi)存消耗都與RDD密切相關(guān)。Spark允許用戶將應(yīng)用中重復(fù)使用的RDD數(shù)據(jù)持久化緩存起來(lái),從而避免反復(fù)計(jì)算的開(kāi)銷,而RDD的持久化形態(tài)之一就是將全部或者部分?jǐn)?shù)據(jù)緩存在JVM的Heap中。當(dāng)我們觀察到GC延遲影響效率時(shí),應(yīng)當(dāng)先檢查Spark應(yīng)用本身是否有效利用有限的內(nèi)存空間。RDD占用的內(nèi)存空間比較少的話,程序運(yùn)行的heap空間也會(huì)比較寬松,GC效率也會(huì)相應(yīng)提高;而RDD如果占用大量空間的話,則會(huì)帶來(lái)巨大的性能損失
?? ??? ?總結(jié)
?? ??? ??? ?對(duì)于大量依賴于內(nèi)存計(jì)算的Spark應(yīng)用,GC調(diào)優(yōu)顯得尤為重要。在發(fā)現(xiàn)GC問(wèn)題的時(shí)候,不要著急調(diào)試GC。而是先考慮是否存在Spark進(jìn)程內(nèi)存管理的效率問(wèn)題,例如RDD緩存的持久化和釋放。至于GC參數(shù)的調(diào)試,首先我們比較推薦使用G1 GC來(lái)運(yùn)行Spark應(yīng)用。相較于傳統(tǒng)的垃圾收集器,隨著G1的不斷成熟,需要配置的選項(xiàng)會(huì)更少,能同時(shí)滿足高吞吐量和低延遲的尋求。當(dāng)然,GC的調(diào)優(yōu)不是絕對(duì)的,不同的應(yīng)用會(huì)有不同應(yīng)用的特性,掌握根據(jù)GC日志進(jìn)行調(diào)優(yōu)的方法,才能以不變應(yīng)萬(wàn)變。最后,也不能忘了先對(duì)程序本身的邏輯和代碼編寫(xiě)進(jìn)行考量,例如減少中間變量的創(chuàng)建或者復(fù)制,控制大對(duì)象的創(chuàng)建,將長(zhǎng)期存活對(duì)象放在Off-heap中等等。
?? ?Checkpoint機(jī)制
?? ??? ?checkpoint的意思就是建立檢查點(diǎn),類似于快照,例如在spark計(jì)算里面 計(jì)算流程DAG特別長(zhǎng),服務(wù)器需要將整個(gè)DAG計(jì)算完成得出結(jié)果,但是如果在這很長(zhǎng)的計(jì)算流程中突然中間算出的數(shù)據(jù)丟失了,spark又會(huì)根據(jù)RDD的依賴關(guān)系從頭到尾計(jì)算一遍,這樣子就很費(fèi)性能,當(dāng)然我們可以將中間的計(jì)算結(jié)果通過(guò)cache或者persist放到內(nèi)存或者磁盤中,但是這樣也不能保證數(shù)據(jù)完全不會(huì)丟失,存儲(chǔ)的這個(gè)內(nèi)存出問(wèn)題了或者磁盤壞了,也會(huì)導(dǎo)致spark從頭再根據(jù)RDD計(jì)算一遍,所以就有了checkpoint,其中checkpoint的作用就是將DAG中比較重要的中間數(shù)據(jù)做一個(gè)檢查點(diǎn)將結(jié)果存儲(chǔ)到一個(gè)高可用的地方
?? ??? ?總結(jié):Spark的CheckPoint機(jī)制很重要,也很常用,尤其在機(jī)器學(xué)習(xí)中的一些迭代算法中很常見(jiàn)。比如一個(gè)算法迭代10000次,如果不適用緩沖機(jī)制,如果某分區(qū)數(shù)據(jù)丟失,會(huì)導(dǎo)致整個(gè)計(jì)算鏈重新計(jì)算,所以引入緩存機(jī)制。但是光引入緩存,也不完全可靠,比如緩存丟失或緩存存儲(chǔ)不下,也會(huì)導(dǎo)致重新計(jì)算,所以使用CheckPoint機(jī)制再做一層保證。
?? ??? ?補(bǔ)充:檢查目錄的路徑,一般都是設(shè)置到HDFS上
?? ??? ?Spark懶執(zhí)行的意義
?? ??? ??? ?Spark中,Transformation方法都是懶操作方法,比如map,flatMap,reduceByKey等。當(dāng)觸發(fā)某個(gè)Action操作時(shí)才真正執(zhí)行。
?? ??? ??? ?懶操作的意義:
?? ??? ??? ??? ?①不運(yùn)行job就觸發(fā)計(jì)算,避免了大量的無(wú)意義的計(jì)算,即避免了大量的無(wú)意義的中間結(jié)果的產(chǎn)生,即避免產(chǎn)生無(wú)意義的磁盤I/O及網(wǎng)絡(luò)傳輸
?? ??? ??? ??? ?②更深層次的意義在于,執(zhí)行運(yùn)算時(shí),看到之前的計(jì)算操作越多,執(zhí)行優(yōu)化的可能性就越高
?? ?Spark共享變量
?? ??? ?Spark程序的大部分操作都是RDD操作,通過(guò)傳入函數(shù)給RDD操作函數(shù)來(lái)計(jì)算。這些函數(shù)在不同的節(jié)點(diǎn)上并發(fā)執(zhí)行,但每個(gè)內(nèi)部的變量有不同的作用域,不能相互訪問(wèn),所以有時(shí)會(huì)不太方便,Spark提供了兩類共享變量供編程使用——廣播變量和計(jì)數(shù)器
?? ??? ?1.?廣播變量
?? ??? ??? ?這是一個(gè)只讀對(duì)象,在所有節(jié)點(diǎn)上都有一份緩存,創(chuàng)建方法是SparkContext.broadcast()
?? ??? ??? ?注意,廣播變量是只讀的,所以創(chuàng)建之后再更新它的值是沒(méi)有意義的,一般用val修飾符來(lái)定義廣播變量。
?? ??? ?2.?計(jì)數(shù)器
?? ??? ??? ?計(jì)數(shù)器只能增加,是共享變量,用于計(jì)數(shù)或求和。
?? ??? ??? ?計(jì)數(shù)器變量的創(chuàng)建方法是SparkContext.accumulator(v,?name),其中v是初始值,name是名稱。
?? ?spark解決數(shù)據(jù)傾斜問(wèn)題
?? ??? ?將少量的數(shù)據(jù)轉(zhuǎn)化為Map進(jìn)行廣播,廣播會(huì)將此 Map 發(fā)送到每個(gè)節(jié)點(diǎn)中,如果不進(jìn)行廣播,每個(gè)task執(zhí)行時(shí)都會(huì)去獲取該Map數(shù)據(jù),造成了性能浪費(fèi)。
轉(zhuǎn)載于:https://www.cnblogs.com/Striverchen/p/10557905.html
總結(jié)
以上是生活随笔為你收集整理的Spark Core的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: linux01
- 下一篇: 计算机考研2017真题408,2017计