日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark性能相关参数配置详解

發布時間:2024/1/17 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark性能相关参数配置详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

隨著Spark的逐漸成熟完善,?越來越多的可配置參數被添加到Spark中來,?本文試圖通過闡述這其中部分參數的工作原理和配置思路,?和大家一起探討一下如何根據實際場合對Spark進行配置優化。

?

由于篇幅較長,所以在這里分篇組織,如果要看最新完整的網頁版內容,可以戳這里:http://spark-config.readthedocs.org/,主要是便于更新內容

?

schedule調度相關

?

調度相關的參數設置,大多數內容都很直白,其實無須過多的額外解釋,不過基于這些參數的常用性(大概會是你針對自己的集群第一步就會配置的參數),這里多少就其內部機制做一些解釋。

?

spark.cores.max

?

一個集群最重要的參數之一,當然就是CPU計算資源的數量。spark.cores.max這個參數決定了在Standalone和Mesos模式下,一個Spark應用程序所能申請的CPU Core的數量。如果你沒有并發跑多個Spark應用程序的需求,那么可以不需要設置這個參數,默認會使用spark.deploy.defaultCores的值(而spark.deploy.defaultCores的值默認為Int.Max,也就是不限制的意思)從而應用程序可以使用所有當前可以獲得的CPU資源。

?

針對這個參數需要注意的是,這個參數對Yarn模式不起作用,YARN模式下,資源由Yarn統一調度管理,一個應用啟動時所申請的CPU資源的數量由另外兩個直接配置Executor的數量和每個Executor中core數量的參數決定。(歷史原因造成,不同運行模式下的一些啟動參數個人認為還有待進一步整合)

?

此外,在Standalone模式等后臺分配CPU資源時,目前的實現中,在spark.cores.max允許的范圍內,基本上是優先從每個Worker中申請所能得到的最大數量的CPU core給每個Executor,因此如果人工限制了所申請的Max Core的數量小于Standalone和Mesos模式所管理的CPU數量,可能發生應用只運行在集群中部分節點上的情況(因為部分節點所能提供的最大CPU資源數量已經滿足應用的要求),而不是平均分布在集群中。通常這不會是太大的問題,但是如果涉及數據本地性的場合,有可能就會帶來一定的必須進行遠程數據讀取的情況發生。理論上,這個問題可以通過兩種途徑解決:一是Standalone和Mesos的資源管理模塊自動根據節點資源情況,均勻分配和啟動Executor,二是和Yarn模式一樣,允許用戶指定和限制每個Executor的Core的數量。社區中有一個PR試圖走第二種途徑來解決類似的問題,不過截至我寫下這篇文檔為止(2014.8),還沒有被Merge。

?

spark.task.cpus

?

這個參數在字面上的意思就是分配給每個任務的CPU的數量,默認為1。實際上,這個參數并不能真的控制每個任務實際運行時所使用的CPU的數量,比如你可以通過在任務內部創建額外的工作線程來使用更多的CPU(至少目前為止,將來任務的執行環境是否能通過LXC等技術來控制還不好說)。它所發揮的作用,只是在作業調度時,每分配出一個任務時,對已使用的CPU資源進行計數。也就是說只是理論上用來統計資源的使用情況,便于安排調度。因此,如果你期望通過修改這個參數來加快任務的運行,那還是趕緊換個思路吧。這個參數的意義,個人覺得還是在你真的在任務內部自己通過任何手段,占用了更多的CPU資源時,讓調度行為更加準確的一個輔助手段。

?

?

spark.scheduler.mode

?

這個參數決定了單個Spark應用內部調度的時候使用FIFO模式還是Fair模式。是的,你沒有看錯,這個參數只管理一個Spark應用內部的多個沒有依賴關系的Job作業的調度策略。

?

如果你需要的是多個Spark應用之間的調度策略,那么在Standalone模式下,這取決于每個應用所申請和獲得的CPU資源的數量(暫時沒有獲得資源的應用就Pending在那里了),基本上就是FIFO形式的,誰先申請和獲得資源,誰就占用資源直到完成。而在Yarn模式下,則多個Spark應用間的調度策略由Yarn自己的策略配置文件所決定。

?

那么這個內部的調度邏輯有什么用呢?如果你的Spark應用是通過服務的形式,為多個用戶提交作業的話,那么可以通過配置Fair模式相關參數來調整不同用戶作業的調度和資源分配優先級。

?

?

spark.locality.wait

?

spark.locality.wait和spark.locality.wait.process,spark.locality.wait.node,spark.locality.wait.rack這幾個參數影響了任務分配時的本地性策略的相關細節。

?

Spark中任務的處理需要考慮所涉及的數據的本地性的場合,基本就兩種,一是數據的來源是HadoopRDD;二是RDD的數據來源來自于RDD Cache(即由CacheManager從BlockManager中讀取,或者Streaming數據源RDD)。其它情況下,如果不涉及shuffle操作的RDD,不構成劃分Stage和Task的基準,不存在判斷Locality本地性的問題,而如果是ShuffleRDD,其本地性始終為No Prefer,因此其實也無所謂Locality。

?

在理想的情況下,任務當然是分配在可以從本地讀取數據的節點上時(同一個JVM內部或同一臺物理機器內部)的運行時性能最佳。但是每個任務的執行速度無法準確估計,所以很難在事先獲得全局最優的執行策略,當Spark應用得到一個計算資源的時候,如果沒有可以滿足最佳本地性需求的任務可以運行時,是退而求其次,運行一個本地性條件稍差一點的任務呢,還是繼續等待下一個可用的計算資源已期望它能更好的匹配任務的本地性呢?

?

這幾個參數一起決定了Spark任務調度在得到分配任務時,選擇暫時不分配任務,而是等待獲得滿足進程內部/節點內部/機架內部這樣的不同層次的本地性資源的最長等待時間。默認都是3000毫秒。

?

基本上,如果你的任務數量較大和單個任務運行時間比較長的情況下,單個任務是否在數據本地運行,代價區別可能比較顯著,如果數據本地性不理想,那么調大這些參數對于性能優化可能會有一定的好處。反之如果等待的代價超過帶來的收益,那就不要考慮了。

?

特別值得注意的是:在處理應用剛啟動后提交的第一批任務時,由于當作業調度模塊開始工作時,處理任務的Executors可能還沒有完全注冊完畢,因此一部分的任務會被放置到No Prefer的隊列中,這部分任務的優先級僅次于數據本地性滿足Process級別的任務,從而被優先分配到非本地節點執行,如果的確沒有Executors在對應的節點上運行,或者的確是No Prefer的任務(如shuffleRDD),這樣做確實是比較優化的選擇,但是這里的實際情況只是這部分Executors還沒來得及注冊上而已。這種情況下,即使加大本節中這幾個參數的數值也沒有幫助。針對這個情況,有一些已經完成的和正在進行中的PR通過例如動態調整No Prefer隊列,監控節點注冊比例等等方式試圖來給出更加智能的解決方案。不過,你也可以根據自身集群的啟動情況,通過在創建SparkContext之后,主動Sleep幾秒的方式來簡單的解決這個問題。

?

spark.speculation

?

spark.speculation以及spark.speculation.interval,spark.speculation.quantile, spark.speculation.multiplier等參數調整Speculation行為的具體細節,Speculation是在任務調度的時候,如果沒有適合當前本地性要求的任務可供運行,將跑得慢的任務在空閑計算資源上再度調度的行為,這些參數調整這些行為的頻率和判斷指標,默認是不使用Speculation的。

?

通常來說很難正確的判斷是否需要Speculation,能真正發揮Speculation用處的場合,往往是某些節點由于運行環境原因,比如CPU資源由于某種原因被占用,磁盤損壞導致IO緩慢造成任務執行速度異常的情況,當然前提是你的分區任務不存在僅能被執行一次,或者不能同時執行多個拷貝等情況。Speculation任務參照的指標通常是其它任務的執行時間,而實際的任務可能由于分區數據尺寸不均勻,本來就會有時間差異,加上一定的調度和IO的隨機性,所以如果一致性指標定得過嚴,Speculation可能并不能真的發現問題,反而增加了不必要的任務開銷,定得過寬,大概又基本相當于沒用。

?

個人覺得,如果你的集群規模比較大,運行環境復雜,的確可能經常發生執行異常,加上數據分區尺寸差異不大,為了程序運行時間的穩定性,那么可以考慮仔細調整這些參數。否則還是考慮如何排除造成任務執行速度異常的因數比較靠鋪一些。

?

當然,我沒有實際在很大規模的集群上運行過Spark,所以如果看法有些偏頗,還請有實際經驗的XD指正。

?

壓縮和序列化相關

?

spark.serializer

?

默認為org.apache.spark.serializer.JavaSerializer,可選org.apache.spark.serializer.KryoSerializer,實際上只要是org.apache.spark.serializer的子類就可以了,不過如果只是應用,大概你不會自己去實現一個的。

?

序列化對于spark應用的性能來說,還是有很大影響的,在特定的數據格式的情況下,KryoSerializer的性能可以達到JavaSerializer的10倍以上,當然放到整個Spark程序中來考量,比重就沒有那么大了,但是以Wordcount為例,通常也很容易達到30%以上的性能提升。而對于一些Int之類的基本類型數據,性能的提升就幾乎可以忽略了。KryoSerializer依賴Twitter的Chill庫來實現,相對于JavaSerializer,主要的問題在于不是所有的Java Serializable對象都能支持。

?

需要注意的是,這里可配的Serializer針對的對象是Shuffle數據,以及RDD Cache等場合,而Spark Task的序列化是通過spark.closure.serializer來配置,但是目前只支持JavaSerializer,所以等于沒法配置啦。

?

更多Kryo序列化相關優化配置,可以參考http://spark.apache.org/docs/latest/tuning.html#data-serialization一節

?

?

spark.rdd.compress

?

這個參數決定了RDD Cache的過程中,RDD數據在序列化之后是否進一步進行壓縮再儲存到內存或磁盤上。當然是為了進一步減小Cache數據的尺寸,對于Cache在磁盤上而言,絕對大小大概沒有太大關系,主要是考慮Disk的IO帶寬。而對于Cache在內存中,那主要就是考慮尺寸的影響,是否能夠Cache更多的數據,是否能減小Cache數據對GC造成的壓力等。

?

這兩者,前者通常不會是主要問題,尤其是在RDD Cache本身的目的就是追求速度,減少重算步驟,用IO換CPU的情況下。而后者,GC問題當然是需要考量的,數據量小,占用空間少,GC的問題大概會減輕,但是是否真的需要走到RDDCache壓縮這一步,或許用其它方式來解決可能更加有效。

?

所以這個值默認是關閉的,但是如果在磁盤IO的確成為問題或者GC問題真的沒有其它更好的解決辦法的時候,可以考慮啟用RDD壓縮。

?

?

spark.broadcast.compress

?

是否對Broadcast的數據進行壓縮,默認值為True。

?

Broadcast機制是用來減少運行每個Task時,所需要發送給TASK的RDD所使用到的相關數據的尺寸,一個Executor只需要在第一個Task啟動時,獲得一份Broadcast數據,之后的Task都從本地的BlockManager中獲取相關數據。在1.1最新版本的代碼中,RDD本身也改為以Broadcast的形式發送給Executor(之前的實現RDD本身是隨每個任務發送的),因此基本上不太需要顯式的決定哪些數據需要broadcast了。

?

因為Broadcast的數據需要通過網絡發送,而在Executor端又需要存儲在本地BlockMananger中,加上最新的實現,默認RDD通過Boradcast機制發送,因此大大增加了Broadcast變量的比重,所以通過壓縮減小尺寸,來減少網絡傳輸開銷和內存占用,通常都是有利于提高整體性能的。

?

什么情況可能不壓縮更好呢,大致上個人覺得同樣還是在網絡帶寬和內存不是問題的時候,如果Driver端CPU資源很成問題(畢竟壓縮的動作基本都在Driver端執行),那或許有調整的必要。

?

?

spark.io.compression.codec

?

RDD Cache和Shuffle數據壓縮所采用的算法Codec,默認值曾經是使用LZF作為默認Codec,最近因為LZF的內存開銷的問題,默認的Codec已經改為Snappy。

?

LZF和Snappy相比較,前者壓縮率比較高(當然要看具體數據內容了,通常要高20%左右),但是除了內存問題以外,CPU代價也大一些(大概也差20%~50%?)

?

在用于Shuffle數據的場合下,內存方面,應該主要是在使用HashShuffleManager的時候有可能成為問題,因為如果Reduce分區數量巨大,需要同時打開大量的壓縮數據流用于寫文件,進而在Codec方面需要大量的buffer。但是如果使用SortShuffleManager,由于shuffle文件數量大大減少,不會產生大量的壓縮數據流,所以內存開銷大概不會成為主要問題。

?

剩下的就是CPU和壓縮率的權衡取舍,和前面一樣,取決于CPU/網絡/磁盤的能力和負載,個人認為CPU通常更容易成為瓶頸。所以要調整性能,要不不壓縮,要不使用Snappy可能性大一些?

?

對于RDD Cache的場合來說,絕大多數場合都是內存操作或者本地IO,所以CPU負載的問題可能比IO的問題更加突出,這也是為什么spark.rdd.compress本身默認為不壓縮,如果要壓縮,大概也是Snappy合適一些?

?

Storage相關配置參數

?

spark.local.dir

?

這個看起來很簡單,就是Spark用于寫中間數據,如RDD Cache,Shuffle,Spill等數據的位置,那么有什么可以注意的呢。

?

首先,最基本的當然是我們可以配置多個路徑(用逗號分隔)到多個磁盤上增加整體IO帶寬,這個大家都知道。

?

其次,目前的實現中,Spark是通過對文件名采用hash算法分布到多個路徑下的目錄中去,如果你的存儲設備有快有慢,比如SSD+HDD混合使用,那么你可以通過在SSD上配置更多的目錄路徑來增大它被Spark使用的比例,從而更好地利用SSD的IO帶寬能力。當然這只是一種變通的方法,終極解決方案還是應該像目前HDFS的實現方向一樣,讓Spark能夠感知具體的存儲設備類型,針對性的使用。

?

需要注意的是,在Spark 1.0以后,SPARK_LOCAL_DIRS(Standalone, Mesos) or LOCAL_DIRS (YARN)參數會覆蓋這個配置。比如Spark On YARN的時候,Spark Executor的本地路徑依賴于Yarn的配置,而不取決于這個參數。

?

spark.executor.memory

?

Executor?內存的大小,和性能本身當然并沒有直接的關系,但是幾乎所有運行時性能相關的內容都或多或少間接和內存大小相關。這個參數最終會被設置到Executor的JVM的heap尺寸上,對應的就是Xmx和Xms的值

?

理論上Executor?內存當然是多多益善,但是實際受機器配置,以及運行環境,資源共享,JVM GC效率等因素的影響,還是有可能需要為它設置一個合理的大小。多大算合理,要看實際情況

?

Executor的內存基本上是Executor內部所有任務共享的,而每個Executor上可以支持的任務的數量取決于Executor所管理的CPU Core資源的多少,因此你需要了解每個任務的數據規模的大小,從而推算出每個Executor大致需要多少內存即可滿足基本的需求。

?

如何知道每個任務所需內存的大小呢,這個很難統一的衡量,因為除了數據集本身的開銷,還包括算法所需各種臨時內存空間的使用,而根據具體的代碼算法等不同,臨時內存空間的開銷也不同。但是數據集本身的大小,對最終所需內存的大小還是有一定的參考意義的。

?

通常來說每個分區的數據集在內存中的大小,可能是其在磁盤上源數據大小的若干倍(不考慮源數據壓縮,Java對象相對于原始裸數據也還要算上用于管理數據的數據結構的額外開銷),需要準確的知道大小的話,可以將RDD cache在內存中,從BlockManager的Log輸出可以看到每個Cache分區的大小(其實也是估算出來的,并不完全準確)

?

如: BlockManagerInfo: Added rdd_0_1?on disk on sr438:41134(size: 495.3 MB)

?

反過來說,如果你的Executor的數量和內存大小受機器物理配置影響相對固定,那么你就需要合理規劃每個分區任務的數據規模,例如采用更多的分區,用增加任務數量(進而需要更多的批次來運算所有的任務)的方式來減小每個任務所需處理的數據大小。

?

spark.storage.memoryFraction

?

如前面所說spark.executor.memory決定了每個Executor可用內存的大小,而spark.storage.memoryFraction則決定了在這部分內存中有多少可以用于Memory Store管理RDD Cache數據,剩下的內存用來保證任務運行時各種其它內存空間的需要。

?

spark.executor.memory默認值為0.6,官方文檔建議這個比值不要超過JVM Old Gen區域的比值。這也很容易理解,因為RDD Cache數據通常都是長期駐留內存的,理論上也就是說最終會被轉移到Old Gen區域(如果該RDD還沒有被刪除的話),如果這部分數據允許的尺寸太大,勢必把Old Gen區域占滿,造成頻繁的FULL GC。

?

如何調整這個比值,取決于你的應用對數據的使用模式和數據的規模,粗略的來說,如果頻繁發生Full GC,可以考慮降低這個比值,這樣RDD Cache可用的內存空間減少(剩下的部分Cache數據就需要通過Disk Store寫到磁盤上了),會帶來一定的性能損失,但是騰出更多的內存空間用于執行任務,減少Full GC發生的次數,反而可能改善程序運行的整體性能

?

spark.streaming.blockInterval

?

這個參數用來設置Spark Streaming里Stream Receiver生成Block的時間間隔,默認為200ms。具體的行為表現是具體的Receiver所接收的數據,每隔這里設定的時間間隔,就從Buffer中生成一個StreamBlock放進隊列,等待進一步被存儲到BlockManager中供后續計算過程使用。理論上來說,為了每個StreamingBatch間隔里的數據是均勻的,這個時間間隔當然應該能被Batch的間隔時間長度所整除。總體來說,如果內存大小夠用,Streaming的數據來得及處理,這個blockInterval時間間隔的影響不大,當然,如果數據Cache Level是Memory+Ser,即做了序列化處理,那么BlockInterval的大小會影響序列化后數據塊的大小,對于Java的GC的行為會有一些影響。

?

此外spark.streaming.blockQueueSize決定了在StreamBlock被存儲到BlockMananger之前,隊列中最多可以容納多少個StreamBlock。默認為10,因為這個隊列Poll的時間間隔是100ms,所以如果CPU不是特別繁忙的話,基本上應該沒有問題。

?

Shuffle相關

?

Shuffle操作大概是對Spark性能影響最大的步驟之一(因為可能涉及到排序,磁盤IO,網絡IO等眾多CPU或IO密集的操作),這也是為什么在Spark 1.1的代碼中對整個Shuffle框架代碼進行了重構,將Shuffle相關讀寫操作抽象封裝到Pluggable的Shuffle Manager中,便于試驗和實現不同的Shuffle功能模塊。例如為了解決Hash Based的Shuffle Manager在文件讀寫效率方面的問題而實現的Sort Base的Shuffle Manager。

?

spark.shuffle.manager

?

用來配置所使用的Shuffle Manager,目前可選的Shuffle Manager包括默認的org.apache.spark.shuffle.sort.HashShuffleManager(配置參數值為hash)和新的org.apache.spark.shuffle.sort.SortShuffleManager(配置參數值為sort)。

?

這兩個ShuffleManager如何選擇呢,首先需要了解他們在實現方式上的區別。

?

HashShuffleManager,故名思義也就是在Shuffle的過程中寫數據時不做排序操作,只是將數據根據Hash的結果,將各個Reduce分區的數據寫到各自的磁盤文件中。帶來的問題就是如果Reduce分區的數量比較大的話,將會產生大量的磁盤文件。如果文件數量特別巨大,對文件讀寫的性能會帶來比較大的影響,此外由于同時打開的文件句柄數量眾多,序列化,以及壓縮等操作需要分配的臨時內存空間也可能會迅速膨脹到無法接受的地步,對內存的使用和GC帶來很大的壓力,在Executor內存比較小的情況下尤為突出,例如Spark on Yarn模式。

?

SortShuffleManager,是1.1版本之后實現的一個試驗性(也就是一些功能和接口還在開發演變中)的ShuffleManager,它在寫入分區數據的時候,首先會根據實際情況對數據采用不同的方式進行排序操作,底線是至少按照Reduce分區Partition進行排序,這樣來至于同一個Map任務Shuffle到不同的Reduce分區中去的所有數據都可以寫入到同一個外部磁盤文件中去,用簡單的Offset標志不同Reduce分區的數據在這個文件中的偏移量。這樣一個Map任務就只需要生成一個shuffle文件,從而避免了上述HashShuffleManager可能遇到的文件數量巨大的問題

?

兩者的性能比較,取決于內存,排序,文件操作等因素的綜合影響。

?

對于不需要進行排序的Shuffle操作來說,如repartition等,如果文件數量不是特別巨大,HashShuffleManager面臨的內存問題不大,而SortShuffleManager需要額外的根據Partition進行排序,顯然HashShuffleManager的效率會更高。

?

而對于本來就需要在Map端進行排序的Shuffle操作來說,如ReduceByKey等,使用HashShuffleManager雖然在寫數據時不排序,但在其它的步驟中仍然需要排序,而SortShuffleManager則可以將寫數據和排序兩個工作合并在一起執行,因此即使不考慮HashShuffleManager的內存使用問題,SortShuffleManager依舊可能更快。

?

spark.shuffle.sort.bypassMergeThreshold

?

這個參數僅適用于SortShuffleManager,如前所述,SortShuffleManager在處理不需要排序的Shuffle操作時,由于排序帶來性能的下降。這個參數決定了在這種情況下,當Reduce分區的數量小于多少的時候,在SortShuffleManager內部不使用Merge Sort的方式處理數據,而是與Hash Shuffle類似,直接將分區文件寫入單獨的文件,不同的是,在最后一步還是會將這些文件合并成一個單獨的文件。這樣通過去除Sort步驟來加快處理速度,代價是需要并發打開多個文件,所以內存消耗量增加,本質上是相對HashShuffleMananger一個折衷方案。這個參數的默認值是200個分區,如果內存GC問題嚴重,可以降低這個值。

?

spark.shuffle.consolidateFiles

?

這個配置參數僅適用于HashShuffleMananger的實現,同樣是為了解決生成過多文件的問題,采用的方式是在不同批次運行的Map任務之間重用Shuffle輸出文件,也就是說合并的是不同批次的Map任務的輸出數據,但是每個Map任務所需要的文件還是取決于Reduce分區的數量,因此,它并不減少同時打開的輸出文件的數量,因此對內存使用量的減少并沒有幫助。只是HashShuffleManager里的一個折中的解決方案。

?

需要注意的是,這部分的代碼實現盡管原理上說很簡單,但是涉及到底層具體的文件系統的實現和限制等因素,例如在并發訪問等方面,需要處理的細節很多,因此一直存在著這樣那樣的bug或者問題,導致在例如EXT3上使用時,特定情況下性能反而可能下降,因此從Spark 0.8的代碼開始,一直到Spark 1.1的代碼為止也還沒有被標志為Stable,不是默認采用的方式。此外因為并不減少同時打開的輸出文件的數量,因此對性能具體能帶來多大的改善也取決于具體的文件數量的情況。所以即使你面臨著Shuffle文件數量巨大的問題,這個配置參數是否使用,在什么版本中可以使用,也最好還是實際測試以后再決定。

?

spark.shuffle.spill

?

shuffle的過程中,如果涉及到排序,聚合等操作,勢必會需要在內存中維護一些數據結構,進而占用額外的內存。如果內存不夠用怎么辦,那只有兩條路可以走,一就是out of memory出錯了,二就是將部分數據臨時寫到外部存儲設備中去,最后再合并到最終的Shuffle輸出文件中去。

?

這里spark.shuffle.spill決定是否Spill到外部存儲設備(默認打開),如果你的內存足夠使用,或者數據集足夠小,當然也就不需要Spill,畢竟Spill帶來了額外的磁盤操作。

?

spark.shuffle.memoryFraction/ spark.shuffle.safetyFraction

?

在啟用Spill的情況下,spark.shuffle.memoryFraction(1.1后默認為0.2)決定了當Shuffle過程中使用的內存達到總內存多少比例的時候開始Spill。

?

通過spark.shuffle.memoryFraction可以調整Spill的觸發條件,即Shuffle占用內存的大小,進而調整Spill的頻率和GC的行為。總的來說,如果Spill太過頻繁,可以適當增加spark.shuffle.memoryFraction的大小,增加用于Shuffle的內存,減少Spill的次數。當然這樣一來為了避免內存溢出,對應的可能需要減少RDD cache占用的內存,即減小spark.storage.memoryFraction的值,這樣RDD cache的容量減少,有可能帶來性能影響,因此需要綜合考慮。

?

由于Shuffle數據的大小是估算出來的,一來為了降低開銷,并不是每增加一個數據項都完整的估算一次,二來估算也會有誤差,所以實際暫用的內存可能比估算值要大,這里spark.shuffle.safetyFraction(默認為0.8)用來作為一個保險系數,降低實際Shuffle使用的內存閥值,增加一定的緩沖,降低實際內存占用超過用戶配置值的概率。

?

spark.shuffle.spill.compress/ spark.shuffle.compress

?

這兩個配置參數都是用來設置Shuffle過程中是否使用壓縮算法對Shuffle數據進行壓縮,前者針對Spill的中間數據,后者針對最終的shuffle輸出文件,默認都是True

?

理論上說,spark.shuffle.compress設置為True通常都是合理的,因為如果使用千兆以下的網卡,網絡帶寬往往最容易成為瓶頸。此外,目前的Spark任務調度實現中,以Shuffle劃分Stage,下一個Stage的任務是要等待上一個Stage的任務全部完成以后才能開始執行,所以shuffle數據的傳輸和CPU計算任務之間通常不會重疊,這樣Shuffle數據傳輸量的大小和所需的時間就直接影響到了整個任務的完成速度。但是壓縮也是要消耗大量的CPU資源的,所以打開壓縮選項會增加Map任務的執行時間,因此如果在CPU負載的影響遠大于磁盤和網絡帶寬的影響的場合下,也可能將spark.shuffle.compress設置為False才是最佳的方案

?

對于spark.shuffle.spill.compress而言,情況類似,但是spill數據不會被發送到網絡中,僅僅是臨時寫入本地磁盤,而且在一個任務中同時需要執行壓縮和解壓縮兩個步驟,所以對CPU負載的影響會更大一些,而磁盤帶寬(如果標配12HDD的話)可能往往不會成為Spark應用的主要問題,所以這個參數相對而言,或許更有機會需要設置為False。

?

總之,Shuffle過程中數據是否應該壓縮,取決于CPU/DISK/NETWORK的實際能力和負載,應該綜合考慮。

?

?

原文鏈接:http://spark-config.readthedocs.io/en/latest/

總結

以上是生活随笔為你收集整理的Spark性能相关参数配置详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。