Spark内存管理(1)—— 静态内存管理
Spark內(nèi)存管理簡(jiǎn)介
Spark從1.6開始引入了動(dòng)態(tài)內(nèi)存管理模式,即執(zhí)行內(nèi)存和存儲(chǔ)內(nèi)存之間可以相互搶占?
Spark提供了2種內(nèi)存分配模式:
-
靜態(tài)內(nèi)存管理
-
統(tǒng)一內(nèi)存管理
本系列文章將分別對(duì)這兩種內(nèi)存管理模式的優(yōu)缺點(diǎn)以及設(shè)計(jì)原理進(jìn)行分析(主要基于Spark 1.6.1的內(nèi)存管理進(jìn)行分析)?
在本篇文章中,將先對(duì)靜態(tài)內(nèi)存管理進(jìn)行介紹
堆內(nèi)內(nèi)存
在Spark最初采用的靜態(tài)內(nèi)存管理機(jī)制下,存儲(chǔ)內(nèi)存、執(zhí)行內(nèi)存和其它內(nèi)存的大小在Spark應(yīng)用程序運(yùn)行期間均為固定的,但用戶可以在應(yīng)用程序啟動(dòng)前進(jìn)行配置,堆內(nèi)內(nèi)存的分配如下圖所示:?
默認(rèn)情況下,spark內(nèi)存管理采用unified模式,如果要開啟靜態(tài)內(nèi)存管理模式,需要將spark.memory.useLegacyMode參數(shù)調(diào)為true(默認(rèn)為false),1.6.1版本的官網(wǎng)配置如下所示:?
將參數(shù)調(diào)整為true之后,就會(huì)進(jìn)入到靜態(tài)內(nèi)存管理中來(lái),可以通過(guò)SparkEnv.scala中發(fā)現(xiàn):?
如果spark.memory.useLegacyMode為true,就進(jìn)入到StaticMemoryManager(靜態(tài)內(nèi)存管理);如果為false,就進(jìn)入到UnifiedMemoryManager(統(tǒng)一內(nèi)存管理);同時(shí)我們可以發(fā)現(xiàn)該參數(shù)的默認(rèn)值為false,即默認(rèn)情況下就會(huì)調(diào)用統(tǒng)一內(nèi)存管理類
Execution內(nèi)存
可用的Execution內(nèi)存
用于shuffle操作的內(nèi)存,取決于join、sort、aggregation等過(guò)程頻繁的IO需要的Buffer臨時(shí)數(shù)據(jù)存儲(chǔ)?
簡(jiǎn)單來(lái)說(shuō),spark在shuffle write的過(guò)程中,每個(gè)executor會(huì)將數(shù)據(jù)寫到該executor的物理磁盤上,下一個(gè)stage的task會(huì)去上一個(gè)stage拉取其所需要處理的數(shù)據(jù),并且是邊拉取邊進(jìn)行處理的(和MapReduce的拉取合并數(shù)據(jù)基本一樣),這個(gè)時(shí)候就會(huì)用到一個(gè)aggregate的數(shù)據(jù)結(jié)構(gòu),比如hashmap這種邊拉取數(shù)據(jù)邊進(jìn)行聚合。這部分內(nèi)存就被稱為execution內(nèi)存
從StaticMemoryManager.scala中的getMaxExecutionMemory方法中,我們可以發(fā)現(xiàn):?
每個(gè)executor分配給execution的內(nèi)存為:?
ExecutionMemory = systemMaxMemory * memoryFraction * safetyFraction?
默認(rèn)情況下為:systemMaxMemory * 0.2 * 0.8 = 0.16 * systemMaxMemory?
即默認(rèn)為executor最大可用內(nèi)存 * 0.16
Execution內(nèi)存再運(yùn)行的時(shí)候會(huì)被分配給運(yùn)行在JVM上的task;這里不同的是,分配給每個(gè)task的內(nèi)存并不是固定的,而是動(dòng)態(tài)的;spark不是一上來(lái)就分配固定大小的內(nèi)存塊給task,而是允許一個(gè)task占據(jù)JVM所有execution內(nèi)存?
每個(gè)JVM上的task可以最多申請(qǐng)至多1/N的execution內(nèi)存,其中N為active task的個(gè)數(shù),由spark.executor.cores指定;如果task的申請(qǐng)沒(méi)有被批準(zhǔn),它會(huì)釋放一部分內(nèi)存,并且下次申請(qǐng)的時(shí)候,它會(huì)申請(qǐng)更小的一部分內(nèi)存?
注:
-
每個(gè)Executor單獨(dú)運(yùn)行在一個(gè)JVM進(jìn)程中,每個(gè)Task則是運(yùn)行在Executor中的線程
-
spark.executor.cores設(shè)置的是每個(gè)executor的core數(shù)量
-
task的數(shù)量就是partition的數(shù)量
-
一般來(lái)說(shuō),一個(gè)core設(shè)置2~4個(gè)partition,即2~4個(gè)task在一個(gè)core上運(yùn)行
注意:為了防止過(guò)多的spilling數(shù)據(jù),只有當(dāng)一個(gè)task分配到的內(nèi)存達(dá)到execution內(nèi)存1/2N的時(shí)候才會(huì)spill,如果目前空閑的內(nèi)存達(dá)不到1/2N的時(shí)候,內(nèi)存申請(qǐng)會(huì)被阻塞直到其它的task spill掉它們的內(nèi)存;?
如果不這樣限制,假設(shè)當(dāng)前一個(gè)任務(wù)占據(jù)了絕大部分內(nèi)存,那么新來(lái)的task會(huì)一直往硬盤spill數(shù)據(jù),這樣就會(huì)導(dǎo)致比較嚴(yán)重的I/O問(wèn)題;而我們做了一定程度的限制,會(huì)進(jìn)行一定程度的阻塞等待,對(duì)于頻繁的小數(shù)據(jù)集的I/O會(huì)有一定的減緩?
例子:某executor先啟動(dòng)一個(gè)task A,并在task B啟動(dòng)前快速占用了所有可用的內(nèi)存;在B啟用之后N變成了2,task B會(huì)阻塞直到task A spill,自己可以獲得1/2N=1/4的execution內(nèi)存的時(shí)候;而一大task B獲取到了1/4的內(nèi)存,A和B就都有可能spill了
預(yù)留內(nèi)存
Spark之所以會(huì)有一個(gè)SafetyFraction這樣的參數(shù),是為了避免潛在的OOM。例如,進(jìn)行計(jì)算時(shí),有一個(gè)提前未預(yù)料到的比較大的數(shù)據(jù),會(huì)導(dǎo)致計(jì)算時(shí)間延長(zhǎng)甚至OOM,safetyFraction為storage和execution都提供了額外的buffer以防止此類的數(shù)據(jù)傾斜;這部分內(nèi)存叫作預(yù)留內(nèi)存
Storage內(nèi)存
可用的Storage內(nèi)存
該部分內(nèi)存用作對(duì)RDD的緩存(如調(diào)用cache、persist等方法),節(jié)點(diǎn)間傳輸?shù)膹V播變量
StaticMemoryManager.scala中的getMaxStorageMemory方法發(fā)現(xiàn):?
最后為每個(gè)executor分配到的storage的內(nèi)存:?
StorageMemory = systemMaxMemory * memoryFraction * safetyFraction?
默認(rèn)情況下為:systemMaxMemory * 0.6 * 0.9 = 0.54 * systemMaxMemory?
即默認(rèn)分配executor最大可用內(nèi)存的0.54
預(yù)留內(nèi)存
同2.1.2中Execution內(nèi)存中的預(yù)留部分
Unroll
Unroll是storage中比較特殊的一部分,它默認(rèn)占據(jù)storage總內(nèi)存的20%?
BlockManager是spark自己實(shí)現(xiàn)的內(nèi)部分布式文件系統(tǒng),BlockManager接受數(shù)據(jù)(可能從本地或者其他節(jié)點(diǎn))的時(shí)候是以iterator的形式,并且這些數(shù)據(jù)是有序列化和非序列化的,因此需要注意以下兩點(diǎn):
Iterator在物理內(nèi)存上是不連續(xù)的,如果后續(xù)spark要把數(shù)據(jù)裝載進(jìn)內(nèi)存的話,就需要把這些數(shù)據(jù)放進(jìn)一個(gè)array(物理上連續(xù))
另外,序列化數(shù)據(jù)需要進(jìn)行展開,如果直接展開序列化的數(shù)據(jù),會(huì)造成OOM,所以BlockManager會(huì)逐漸的展開這個(gè)iterator,并逐漸檢查內(nèi)存里是否還有足夠的空間用來(lái)展開數(shù)據(jù)放進(jìn)array里
StaticMemoryManager.scala中的maxUnrollMemory方法:?
Unroll的優(yōu)先級(jí)別還是比較高的,它使用的內(nèi)存空間是可以從storage中借用的,如果在storage中沒(méi)有現(xiàn)存的數(shù)據(jù)block,它甚至可以占據(jù)整個(gè)storage空間;如果storage中有數(shù)據(jù)block,它可以最大drop掉內(nèi)存的數(shù)據(jù)是通過(guò)spark.storage.unrollFraction來(lái)控制的,通過(guò)源碼可知這部分的默認(rèn)值為0.2?
注意:這個(gè)20%的空間并不是靜態(tài)保留的,而是通過(guò)drop掉內(nèi)存中的數(shù)據(jù)block來(lái)分配的(動(dòng)態(tài)的分配過(guò)程);如果unroll失敗了,spark會(huì)把這部分?jǐn)?shù)據(jù)evict到硬盤中去
eviction策略
在spark技術(shù)文檔中,eviction一詞經(jīng)常出現(xiàn),eviction并不是單純字面上驅(qū)逐的意思。說(shuō)句題外話,spark通常被我們叫做內(nèi)存計(jì)算框架,但是從嚴(yán)格意義上說(shuō),spark并不是內(nèi)存計(jì)算的新技術(shù);無(wú)論是cache還是persist這類算子,spark在內(nèi)存安排上,絕大多數(shù)用的都是LRU策略(LRU可以說(shuō)是一種算法,也可以算是一種原則,用來(lái)判斷如何從Cache中清除對(duì)象,而LRU就是“近期最少使用”原則,當(dāng)Cache溢出時(shí),最近最少使用的對(duì)象將被從Cache中清除)。即當(dāng)內(nèi)存不夠的時(shí)候,會(huì)evict掉最遠(yuǎn)使用過(guò)的內(nèi)存數(shù)據(jù)block;當(dāng)evict的時(shí)候,spark會(huì)將該數(shù)據(jù)塊evict到硬盤,而不是單純的拋棄掉?
無(wú)論是storage還是execution的內(nèi)存空間,當(dāng)內(nèi)存區(qū)域的空間不夠用的時(shí)候,spark都會(huì)evict數(shù)據(jù)到硬盤
Other部分
這部分的內(nèi)存用于程序本身運(yùn)行所需要的內(nèi)存,以及用戶定義的數(shù)據(jù)結(jié)構(gòu)和創(chuàng)建的對(duì)象,此內(nèi)存由上面兩部分:storage、execution決定的,默認(rèn)為0.2
堆外內(nèi)存
Spark1.6開始引入了Off-heap memory(詳見SPARK-11389)?
堆外的空間分配較為簡(jiǎn)單,只有存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存,如圖所示:?
?
可用的執(zhí)行內(nèi)存和存儲(chǔ)內(nèi)存占用的空間大小直接由參數(shù) spark.memory.storageFraction 決定(默認(rèn)為0.5),由于堆外內(nèi)存占用的空間可以被精確計(jì)算,所以無(wú)需再設(shè)定保險(xiǎn)區(qū)域
局限性
在Spark的設(shè)計(jì)文檔中,指出了靜態(tài)內(nèi)存管理的局限性:
沒(méi)有適用于所有應(yīng)用的默認(rèn)配置,通常需要開發(fā)人員針對(duì)不同的應(yīng)用進(jìn)行不同的參數(shù)進(jìn)行配置:比如根據(jù)任務(wù)的執(zhí)行邏輯,調(diào)整shuffle和storage的內(nèi)存占比來(lái)適應(yīng)任務(wù)的需求
這樣需要開發(fā)人員具備較高的spark原理知識(shí)
那些不cache數(shù)據(jù)的應(yīng)用在運(yùn)行的時(shí)候只會(huì)占用一小部分可用內(nèi)存,而默認(rèn)的內(nèi)存配置中storage就用去了60%,造成了浪費(fèi)
總結(jié)
以上是生活随笔為你收集整理的Spark内存管理(1)—— 静态内存管理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 利用JavaCSV API来读写csv文
- 下一篇: Spark内存管理(2)—— 统一内存管