日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark优化总结

發(fā)布時(shí)間:2024/4/14 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark优化总结 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

1. RDD

1.1 RDD持久化

????對(duì)多次使用的RDD進(jìn)行持久化。此時(shí)Spark就會(huì)根據(jù)你的持久化策略,將RDD中的數(shù)據(jù)保存到內(nèi)存或者磁盤中。以后每次對(duì)這個(gè)RDD進(jìn)行算子操作時(shí),都會(huì)直接從內(nèi)存或磁盤中提取持久化的RDD數(shù)據(jù),然后執(zhí)行算子,而不會(huì)從源頭處重新計(jì)算一遍這個(gè)RDD。

1.2 調(diào)整RDD partition數(shù)

????spark一般按照HDFS上加載block個(gè)數(shù)決定并行度,也就是為每個(gè)block(如128M)創(chuàng)建一個(gè)RDDpartition,這個(gè)數(shù)據(jù)量在很多時(shí)候是偏多的;而當(dāng)我們使用filter算子過濾掉較多數(shù)據(jù)后,后續(xù)的計(jì)算并行度不變這在有時(shí)又是偏高的。

????spark提供了兩種方法對(duì)操作并行度進(jìn)行調(diào)優(yōu)。第一種,在數(shù)據(jù)混洗時(shí)使用參數(shù)為混洗后的RDD制定并行度;第二種,對(duì)于已有的RDD可進(jìn)行重新分區(qū)以獲取更多或更少的分區(qū)。repartition()會(huì)把RDD隨機(jī)打亂并重新分成制定分區(qū)數(shù)。如果要減少RDD可使用coalesce()不打亂數(shù)據(jù)更高效。

????其他方法手動(dòng)textFile()、parallelize()等方法的第二個(gè)參數(shù)來設(shè)置并行度,也可以使用spark.default.parallelism參數(shù),來設(shè)置統(tǒng)一的并行度

2. shuffle

????shuffle過程中上游的task會(huì)根據(jù)所處理數(shù)據(jù)的KEY,來產(chǎn)生多個(gè)不同KEY值的中間文件。下游的task根據(jù)自己所要處理的KEY值,去上游拉取相應(yīng)的中間文件。這個(gè)過程中涉及到大量的磁盤IO以及大量的網(wǎng)絡(luò)傳輸;且下游的某些task拉取到比其他task多得多的數(shù)據(jù),導(dǎo)致數(shù)據(jù)傾斜從而產(chǎn)生嚴(yán)重的性能問題。

2.1 map join消除shuffle

????傳統(tǒng)的join操作會(huì)導(dǎo)致shuffle操作導(dǎo)致性能問題。如果join兩端的數(shù)據(jù)一個(gè)大一個(gè)小,則可以廣播小RDD全量數(shù)據(jù),driver和每個(gè)Executor內(nèi)存中都會(huì)駐留一份小RDD的全量數(shù)據(jù)。此時(shí)就不會(huì)發(fā)生shuffle操作,也就不會(huì)發(fā)生數(shù)據(jù)傾斜。

2.2 預(yù)聚合減少shuffle量

????map-side預(yù)聚合指上游task對(duì)相同的key進(jìn)行一次聚合,類似于MapReduce中的本地combiner。預(yù)聚合之后每個(gè)task下每個(gè)key都只有一條,大大減少下游task要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷。

????通常來說,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因?yàn)閞educeByKey和aggregateByKey算子都會(huì)使用用戶自定義的函數(shù)對(duì)每個(gè)節(jié)點(diǎn)本地的相同key進(jìn)行預(yù)聚合。而groupByKey算子是不會(huì)進(jìn)行預(yù)聚合的,全量的中間數(shù)據(jù)各個(gè)節(jié)點(diǎn)之間分發(fā)和傳輸,性能相對(duì)較差。

2.3 復(fù)用shuffle中間文件

????默認(rèn)情況下shuffle中間文件數(shù)目為map tasks * reduce tasks,通過設(shè)置spark.shuffle.consolidateFiles為true,來合并shuffle中間文件。每個(gè)計(jì)算核心只產(chǎn)生一批文件,同一個(gè)核心后面運(yùn)行的task復(fù)用前面task產(chǎn)生的中間文件,此時(shí)文件數(shù)為cores * reduce tasks數(shù)目,大大減少了數(shù)據(jù)拉取的文件數(shù)。

????設(shè)置方法:new SparkConf().set("spark.shuffle.consolidateFiles", "true")

2.4?spark.local.dirs多目錄

????spark需要使用本地磁盤存儲(chǔ)數(shù)據(jù)混洗的中間數(shù)據(jù),以及溢寫到磁盤總的RDD分區(qū)數(shù)據(jù)。因此設(shè)置更多的本地磁盤會(huì)分擔(dān)IO壓力,提升IO吞吐量效率。

????獨(dú)立模式下修改spark-env.sh文件中的SPARK_LOCAL_DIRSyarn、mesos模式下修改spark.local.dir來實(shí)現(xiàn)

2.5?使用kryo序列化

????當(dāng)spark需要進(jìn)行網(wǎng)絡(luò)傳輸數(shù)據(jù)或?qū)?shù)據(jù)溢寫到磁盤的時(shí)候,spark需要將數(shù)據(jù)序列化成二進(jìn)制格式。默認(rèn)情況下spark會(huì)使用java的內(nèi)建序列化庫,spark也支持地方三房的序列化庫Kryo,提供比java序列化工具更短的序列化時(shí)間和更高的壓縮比。

第一步: 在sparkConf中設(shè)置一個(gè)屬性。

set(“spark.serializer”,"org.apache.spark.serializer.KryoSerializer")

第二步: 注冊(cè)你使用到的,需要通過Kryo序列化,一些自定義的類。

.registerKryoClasses(new Class[]{CategorySortKey.class});

Kryo不是默認(rèn)序列化類庫的原因:如果要達(dá)到它的最佳性能的話,那么就一定要注冊(cè)你自定義的類(比如,你的算子函數(shù)中使用到了外部自定義類型的對(duì)象變量,這時(shí)就要求必須注冊(cè)你的類,否則達(dá)不到最佳性能)

2.6 提升shuffle并行度

????增加shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,從而讓每個(gè)task處理比原來更少的數(shù)據(jù)。但該方法只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題。

3. 分配資源

3.1 num-executors

  參數(shù)說明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個(gè)Executor進(jìn)程來執(zhí)行。Driver在向YARN集群管理器申請(qǐng)資源時(shí),YARN集群管理器會(huì)盡可能按照你的設(shè)置來在各個(gè)工作節(jié)點(diǎn)啟動(dòng)相應(yīng)數(shù)量的Executor。如果不設(shè)置的話,默認(rèn)只會(huì)給你啟動(dòng)少量的Executor進(jìn)程,此時(shí)Spark作業(yè)的運(yùn)行速度是非常慢的。

  參數(shù)調(diào)優(yōu)建議:每個(gè)Spark作業(yè)的運(yùn)行一般設(shè)置50~100個(gè)左右的Executor進(jìn)程比較合適,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少,無法充分利用集群資源;設(shè)置的太多的話,隊(duì)列其他任務(wù)可能無法給予充分的資源。

3.2 executor-memory

  參數(shù)說明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的內(nèi)存。Executor內(nèi)存的大小,很多時(shí)候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常,也有直接的關(guān)聯(lián)。

  參數(shù)調(diào)優(yōu)建議:每個(gè)Executor進(jìn)程的內(nèi)存設(shè)置4G~8G較為合適。num-executors乘以executor-memory就代表Spark作業(yè)申請(qǐng)到的總內(nèi)存量,這個(gè)量是不能超過隊(duì)列的最大內(nèi)存量的。此外,如果與其他人共享這個(gè)資源隊(duì)列,為避免獨(dú)占則申請(qǐng)的總內(nèi)存量最好不要超過資源隊(duì)列總內(nèi)存的1/3~1/2。

3.3 executor-cores

  參數(shù)說明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的CPU core數(shù)量。這個(gè)參數(shù)決定了每個(gè)Executor進(jìn)程并行執(zhí)行task線程的能力。

  參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個(gè)較為合適。num-executors * executor-cores就代表了你的Spark作業(yè)申請(qǐng)到的總CPU核心數(shù),這個(gè)量是不能超過隊(duì)列的最大核心數(shù)。此外,如果是跟他人共享這個(gè)隊(duì)列,不要超過隊(duì)列總CPU core的1/3~1/2。

3.4 driver-memory

  參數(shù)說明:該參數(shù)用于設(shè)置Driver進(jìn)程的內(nèi)存。

  參數(shù)調(diào)優(yōu)建議:Driver的內(nèi)存通常來說不設(shè)置,或者設(shè)置1G左右應(yīng)該就夠了。唯一需要注意的一點(diǎn)是,如果需要使用collect算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理,那么必須確保Driver的內(nèi)存足夠大,否則會(huì)出現(xiàn)OOM內(nèi)存溢出的問題。

3.5 spark.default.parallelism

  參數(shù)說明:該參數(shù)用于設(shè)置每個(gè)stage的默認(rèn)task數(shù)量。這個(gè)參數(shù)極為重要,如果不設(shè)置可能會(huì)直接影響你的Spark作業(yè)性能。

  參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認(rèn)task數(shù)量為500~1000個(gè)較為合適。若不去設(shè)置這個(gè)參數(shù),那么Spark會(huì)根據(jù)底層HDFS的block數(shù)量來設(shè)置task的數(shù)量,默認(rèn)是一個(gè)HDFS block對(duì)應(yīng)一個(gè)task。通常來說,Spark默認(rèn)設(shè)置的數(shù)量是偏少的。? 如果task數(shù)量偏少的話,就會(huì)導(dǎo)致設(shè)置好的Executor的參數(shù)都前功盡棄,無論Executor有多少個(gè),內(nèi)存和CPU有多大,但是task只有1個(gè)或者10個(gè),那么90%的Executor進(jìn)程可能根本就沒有task執(zhí)行,也就是白白浪費(fèi)了資源!因此Spark官網(wǎng)建議的設(shè)置原則是,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數(shù)量為300個(gè),那么設(shè)置1000個(gè)task是可以的。

3.6 spark.storage.memoryFraction

  參數(shù)說明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6。也就是說,默認(rèn)Executor 60%的內(nèi)存,可以用來保存持久化的RDD數(shù)據(jù)。根據(jù)你選擇的不同的持久化策略,如果內(nèi)存不夠時(shí),可能數(shù)據(jù)就不會(huì)持久化,或者數(shù)據(jù)會(huì)寫入磁盤。

  參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中,有較多的RDD持久化操作,該參數(shù)的值可以適當(dāng)提高一些,保證持久化的數(shù)據(jù)能夠容納在內(nèi)存中。避免內(nèi)存不夠緩存所有的數(shù)據(jù),導(dǎo)致數(shù)據(jù)只能寫入磁盤中,降低了性能。但是如果Spark作業(yè)中的shuffle類操作比較多,而持久化操作比較少,那么這個(gè)參數(shù)的值適當(dāng)降低一些比較合適。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢(通過spark web ui可以觀察到作業(yè)的gc耗時(shí)),意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值。

3.7 spark.shuffle.memoryFraction

  參數(shù)說明:該參數(shù)用于設(shè)置shuffle過程中一個(gè)task拉取到上個(gè)stage的task的輸出后,進(jìn)行聚合操作時(shí)能夠使用的Executor內(nèi)存的比例,默認(rèn)是0.2。也就是說,Executor默認(rèn)只有20%的內(nèi)存用來進(jìn)行該操作。shuffle操作在進(jìn)行聚合時(shí),如果發(fā)現(xiàn)使用的內(nèi)存超出了這個(gè)20%的限制,那么多余的數(shù)據(jù)就會(huì)溢寫到磁盤文件中去,此時(shí)就會(huì)極大地降低性能。

  參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中的RDD持久化操作較少,shuffle操作較多時(shí),建議降低持久化操作的內(nèi)存占比,提高shuffle操作的內(nèi)存占比比例,避免shuffle過程中數(shù)據(jù)過多時(shí)內(nèi)存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢,意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值。
資源參數(shù)的調(diào)優(yōu),沒有一個(gè)固定的值,需要同學(xué)們根據(jù)自己的實(shí)際情況(包括Spark作業(yè)中的shuffle操作數(shù)量、RDD持久化操作數(shù)量以及spark web ui中顯示的作業(yè)gc情況),同時(shí)參考本篇文章中給出的原理以及調(diào)優(yōu)建議,合理地設(shè)置上述參數(shù)。

4. 計(jì)算邏輯

4.1 mapPartitions

mapPartitions、foreachPartitions等算子替代map、foreach,函數(shù)一次調(diào)用批量處理一個(gè)partition的數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條,性能相對(duì)來說會(huì)高一些。但要注意使用mapPartitions類算子易出現(xiàn)OOM(內(nèi)存溢出)的問題,因?yàn)槔厥諘r(shí)是無法回收掉太多對(duì)象的,很可能出現(xiàn)OOM異常。

轉(zhuǎn)載于:https://my.oschina.net/puwenchao/blog/1554473

總結(jié)

以上是生活随笔為你收集整理的spark优化总结的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。