日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

内存参数 计算_Spark统一内存管理的实现

發布時間:2023/12/10 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 内存参数 计算_Spark统一内存管理的实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文從源碼角度分析spark統一內存管理的實現原理。

統一內存管理對象的創建

統一內存管理對象在SparkEnv中進行創建和管理,這樣內存管理就在Driver和Executor端中都可以使用。在SparkEnv的create函數中,創建內存管理對象的實現代碼如下:

? ?val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) ?val memoryManager: MemoryManager = ? ? ?if (useLegacyMemoryManager) { ? ? ? ?new StaticMemoryManager(conf, numUsableCores) ? ? } else { // spark2默認使用統一內存管理模式,所以執行這里 ? ? ? ?UnifiedMemoryManager(conf, numUsableCores) ? ? }

從以上代碼片段可知,使用靜態內存管理還是統一內存管理,是由參數:spark.memory.useLegacyMode決定的。從spark-2.0開始默認都是使用統一內存管理,一般不會修改該參數。

所以,一般情況下,默認會創建統一內存管理:UnifiedMemoryManager對象。這幾個對象之間的關系,如圖1所示:

? ? ? ? ? ? ? ? ? ? ? ? ?圖1 內存管理對象和SparkContext

統一內存管理初始化

在創建統一內存管理對象時,會進行初始化操作。為了便于管理和分配內存,在初始化初始化時會把內存分成幾個部分:預留內存,用戶內存,執行和存儲內存。

統一內存管理對象初始化時的主要步驟如下:

(1)計算JVM可用的最大內存,保存在變量:systemMemory中,默認從參數spark.testing.memory獲取值但一般不設置,所以會獲取:Runtime.getRuntime.maxMemory的值。

(2)計算需要預留的內存數:reservedMemory,先取參數:spark.testing.reservedMemory的值,但一般不設置,此時使用默認值:300M。

(3)計算系統使用內存的最小值,它是預留內存的1.5倍,也就是:minSystemMemory=reservedMemory * 1.5,若系統使用內存比這個值小:systemMemory < minSystemMemory,則報錯:請增加spark.driver.memory的值。

(4)獲取executor的內存值:val executorMemory = conf.getSizeAsBytes("spark.executor.memory"),若executorMemory < minSystemMemory,則報錯:請增加spark.executor.memory的值。

(5)計算系統可用內存的總量,系統內存-預留內存,得到spark可以使用的總內存:usableMemory = systemMemory - reservedMemory

(6)計算任務執行和存儲可用內存總量。計算公式是:usableMemory * memoryFraction。其中memoryFraction是一個小數,是配置項spark.memory.fraction的值,默認值是0.6。

(7)最大可用內存已經計算出來了,此時可以創建UnifiedMemoryManager對象了,代碼如下:

? ? new UnifiedMemoryManager( ? ? ?conf, ? ? ?maxHeapMemory = maxMemory, ? ? ?onHeapStorageRegionSize = ? ? ? (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, ? ? ?numCores = numCores)

從創建統一內存管理對象的代碼中可以看出,默認情況下任務的執行內存和存儲內存是個占50%。可以通過參數spark.memory.storageFraction來調整執行內存和存儲內存的占比。

完成統一內存初始化后,內存的劃分情況如圖2所示:

? ? ? ? ? ? ? ? ? ? ? ? ? ? 圖2 統一內存初始化內存分布

統一內存管理的實現

前面已經說明,統一內存管理是在UnifiedMemoryManager類中實現的。下面我們來分析統一內存管理的實現邏輯。

該類的聲明如下:

?private[spark] class UnifiedMemoryManager private[memory] ( ? ?conf: SparkConf, ? ?val maxHeapMemory: Long, ? ?onHeapStorageRegionSize: Long, ? ?numCores: Int)

統一內存管理為spark提供了靈活使用內存的機制。它把一塊大的可使用的內存分成執行內存和存儲內存。執行內存主要被Executor在執行任務時使用,而存儲內存主要用來存儲數據塊。

該類的成員變量說明如下

  • onHeapStorageRegionSize堆內內存區的大小,以字節為單位。該內存區不是靜態保留的; 執行器可以在必要時進行借用。僅當實際存儲內存使用量超過此區域時,才能清除緩存塊。

  • maxHeapMemory最大可用堆內存。該成員變量是通過函數getMaxMemory計算而來的,具體的計算方法見下面的分析。

  • numCores核數。

獲取執行內存

在執行當前任務內存不足時會需要申請執行內存。申請內存的過程可能會向存儲內存池(StorageMemoryPool)借用一部分內存,并把這部分內存添加到執行內存池(ExecutionMemoryPool)中。能夠向存儲內存池借用內存必須滿足以下條件之一:

(1)存儲池的空閑內存大于0;

(2)存儲是否已經借用了執行池的內存。通過:存儲內存池目前的大小減去初始化設置的存儲內存池的大小是否大于0來進行判斷,也就是計算storagePool.poolSize - storageRegionSize是否大于0。若大于0(已借用)表示可以分配。

在借用存儲內存時,可能會把存儲池中的內存釋放一部分,若這部分內存的rdd設置了useDisk級別,還會把這些內存的數據寫入磁盤,否則,這些內存中的存儲數據就丟失了。

內存塊的釋放是在MemoryStore對象中完成(后面的文章會詳細分析這實現),官方文檔中提到過,釋放老的內存塊的算法是LRU(最近最少使用),這是由于在MemoryStore中內存塊是以LinkedHashMap的結構組織的,在鏈表的頭部就是“最近最少使用”的內存塊。這部分內容在分析MemoryStore的實現時再繼續講解。

下面分析獲取執行內存操作的實現邏輯。

acquireExecutionMemory函數

在統一內存管理中實現獲取執行內存的函數是:acquireExecutionMemory。該函數的原型如下:

? ?override private[memory] def acquireExecutionMemory( ? ? ?numBytes: Long, ? ? ?taskAttemptId: Long, ? ? ?memoryMode: MemoryMode): Long = synchronized {...}

該函數嘗試為目前的執行任務獲取numBytes執行內存。對于該函數需要注意以下幾點:

(1)它嘗試獲取numBytes字節大小的內存,返回能夠獲取的字節數,若返回0,則表示無法分配內存;

(2)它是同步函數,所以當有多個任務調用該函數時可能會阻塞,直到有足夠的內存,這樣做是為了在把數據進行持久化之前,讓每個任務都有機會獲取到1/2N的內存(其中N是運行的任務數)。

(3)當老的任務占用很多內存,而新任務數又不斷增加時,阻塞就可能會發生。

實現邏輯

獲取執行內存操作的實現邏輯如下:

(1)根據參數memoryMode的值來選擇操作:若是堆內模式(ON_HEAP),獲取堆內的執行和存儲池總量和堆內可用存儲內存總量,以及總的堆內內存大小。若是堆外模式(OFF_HEAP),獲取堆外的執行和存儲池總量和堆外可用存儲內存總量,以及總的堆外內存大小。這一步的代碼實現如下:

? ? ?val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { ? ? ?// 堆內模式 ? ? ?case MemoryMode.ON_HEAP => ( ? ? ? ?onHeapExecutionMemoryPool, ? ? ? ?onHeapStorageMemoryPool, ? ? ? ?onHeapStorageRegionSize, ? ? ? ?maxHeapMemory) ? ? ?// 堆外模式 ? ? ?case MemoryMode.OFF_HEAP => ( ? ? ? ?offHeapExecutionMemoryPool, ? ? ? ?offHeapStorageMemoryPool, ? ? ? ?offHeapStorageMemory, ? ? ? ?maxOffHeapMemory) ? }

(2)判斷是否需要增加執行內存池(ExecutionPool)。當執行內存池中空閑內存量小于需要申請的內存量時,則會嘗試增加執行池。嘗試增加執行池的過程,本質上就是向存儲池StorageMemoryPool借用內存的過程。能夠成功借用存儲池的內存,需要滿足以下兩個條件之一:1)存儲池有空閑內存;2)存儲池的量大于初始化的量。(也就是說,已經向執行內存池借用了一些內存,存儲池大小增加了)

另外,這個過程可能執行多次,每次嘗試都必須能夠獲取到一些內存,可能會清除掉一些內存中的數據塊,以防其他任務在緩存大的數據塊和清除數據之間進行反復。那么,為什么每次只能清除一些內存呢?這是因為在MemoryStore中,內存是以MemoryEntry對象來組織和管理的,清理時也是以這個為單位進行的,而每個這樣的對象的大小是不同的。

嘗試增加執行內存池大小的實現代碼如下:

?def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { ? ? ?if (extraMemoryNeeded > 0) { ? ? ? ?// 可以分配內存的條件:1.存儲池有空閑內存 或 2.存儲池已經借用了執行池的內存 ? ? ? ?val memoryReclaimableFromStorage = math.max( ? ? ? ? ?storagePool.memoryFree, ? ? ? ? ?storagePool.poolSize - storageRegionSize) ? ? ? ? ? ? ? ?if (memoryReclaimableFromStorage > 0) { ? ? ? ? ?// 通過下面的函數來釋放存儲內存池的內存,減少存儲內存池的大小。 ? ? ? ? ?val spaceToReclaim = storagePool.freeSpaceToShrinkPool( ? ? ? ? ? ?math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) ? ? ? ? ?// 到這里,說明存儲內存池的空間已經釋放,這一步只需要減少存儲內存池的大小即可 ? ? ? ? ?storagePool.decrementPoolSize(spaceToReclaim) ? ? ? ? ?// 增加執行內存池大小的量 ? ? ? ? ?executionPool.incrementPoolSize(spaceToReclaim) ? ? ? } ? ? } ? }

要注意的是,執行內存池將借用的內存均勻地分配給活動任務,以限制每個任務的執行內存分配。保持這個大于執行池大小是很重要的,這不考慮可以通過清除存儲而釋放的潛在內存。另外,這個數量應該保持在“maxMemory”以下,以便在任務中執行內存分配的公平性,否則,任務可能占用超過其平均份額的執行內存。

(3)然后調用executionPool.acquireMemory來獲取內存,該函數的聲明如下:

?private[memory] def acquireMemory( ? ? ?numBytes: Long, // 想要獲取的內存數 ? ? ?taskAttemptId: Long, // 想要獲取內存的任務數 ? ? ?maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, // 一個回調函數,用來增長內存池的大小 ? ? ?computeMaxPoolSize: () => Long = () => poolSize // 回調函數,用來獲取某個時刻允許獲取內存的最大值 ? ? ): Long = lock.synchronized

該函數嘗試為給定任務獲取numBytes大小的內存,并返回獲取到的內存大小。該函數可能會阻塞,直到有足夠的內存再返回。該函數的執行邏輯大致如下:

  • 添加任務到taskMemory這個map中,該map保存了任務id和申請的內存大小的對應關系。

  • 調用maybeGrowExecutionPool回調函數來向storeage申請內存,若內存不夠該函數會釋放掉一些存儲內存。一次釋放的內存可能不夠,所以該函數可能會嘗試多次。

  • maybeGrowExecutionPool會調用memoryStore.evictBlocksToFreeSpace函數,在該函數中會根據rdd和內存模式等參數來清除一些內存塊,釋放對應大小的內存,具體的實現過程在后面分析。

獲取存儲內存

獲取存儲內存的過程比獲取執行內存的過程要相對簡單。因為,獲取存儲內存時不會強制釋放正在使用的執行內存,而只能從執行池的空閑內存中申請。

所以,申請存儲內存的步驟主要是以下幾步:

(1)判斷需要申請的內存數量,是否大于存儲池的空閑內存量。若大于(存儲池的內存量不夠),則向執行池的空閑內存申請一部分內存。要注意:可能執行池的空閑內存也不夠,或根本就沒有空閑內存。

(2)調用存儲池的內存獲取函數獲取內存,若空閑內存不夠,則需要從存儲池中按LRU算法釋放一部分內存。

獲取存儲內存是在函數acquireStorageMemory中實現,下面我們來分析一下該函數的具體實現。

acquireStorageMemory函數

該函數的原型如下:

? override def acquireStorageMemory( ? ? ?blockId: BlockId, ? ? ?numBytes: Long, ? ? ?memoryMode: MemoryMode): Boolean = synchronized {...}

該函數的參數:

  • memoryMode: MemoryMode:該參數是內存的模式,主要有兩種:ON_HEAP或OFF_HEAP。

  • numBytes:需要申請的內存大小,單位是bytes

  • blockId:數據塊的ID,也是可能會被釋放的數據塊。若該id為空,則會通過LRU算法尋找需要釋放塊對應的內存。

該函數是一個同步函數,若是多個線程同時調用該函數,可能會阻塞。

實現分析

該函數的主要實現邏輯如下:

(1)根據參數memoryMode來獲取此種模式下的最大可以用存儲內存,保存在變量maxMemory中。

(2)判斷內存申請量(即參數numBytes)是否大于maxMemory,若申請內存大于最大可用內存,會失敗。報錯:該blockid的數據塊需要的內存超過最大使用內存。

(3)若申請的內存大小:numBytes大于存儲池的空閑內存大小,則需要從執行池中“借用”一些空閑內存。借用的意思是,從執行池的空閑內存中獲取一部分內存,但要注意:最多從執行池中借用空閑內存量,不會釋放任務正在使用的執行內存。實現代碼如下:

?if (numBytes > storagePool.memoryFree) { // 所需內存量大于可用存儲空閑內存量,需要從執行池中申請一部分內存 ? ? ?val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, ? ? ? ?numBytes - storagePool.memoryFree) //最多獲取執行池中空閑的內存量大小 ? ? ?executionPool.decrementPoolSize(memoryBorrowedFromExecution) ?// 執行內存池減少內存數 ? ? ?storagePool.incrementPoolSize(memoryBorrowedFromExecution) // 存儲內存池增加對應內存 ? }

注意:這一步是體現統一內存思想的重要的一步。

(4)若能夠從執行內存池中借用成功,這一步就直接在存儲內存池中申請內存了。代碼很簡單,就是調用存儲內存池的內存申請函數:

? ? ?storagePool.acquireMemory(blockId, numBytes)
storagePool#acquireMemory函數

該函數來完成存儲池的內存申請工作。要注意,此時的存儲池可能有空閑的內存,也可能沒有空閑內存。當存儲池沒有空閑內存時,需要把已有的某些數據塊從存儲池中清除,以滿足當前數據塊的存儲需要。

該函數的實現邏輯如下:

(1) 計算需要釋放的內存量

需要申請的內存量減去空閑的內存量,就是需要釋放的內存量。也就是說,需要從已經使用的存儲內存塊中釋放一部分內存。

? ? val numBytesToFree = math.max(0, numBytes - memoryFree)

(2) 第(1)步已經計算出來需要釋放的內存量了。下面調用StorageMemoryPool.acquireMemory函數來申請內存,釋放一定的數據塊。該函數會調用MemoryStore.evictBlocksToFreeSpace來清除數據塊。會被清除的數據塊的判斷如下:

?def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { //存儲模式相同,且blockId沒有被RDD占用 或則不是要替換相同RDD的不同數據塊 ? ?entry.memoryMode == memoryMode && ? (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))}

若有可以釋放的數據塊,還需要獲取一把寫鎖,加鎖的目的是防止目前還有其他的線程在讀該數據塊。當鎖獲取成功后,就可以開始刪除數據塊了,具體的刪除過程是通過blockInfoManager.removeBlock來進行的。該函數會把需要清除的元數據和數據塊從blockManager中刪除。

釋放內存塊:MemoryStore#evictBlocksToFreeSpace函數

這是MemoryStore類的成員函數,該函數完成內存塊的釋放,若存儲級別包含useDisk還會把內存中的數據保存到磁盤中。該函數的原型如下:

? ?private[spark] def evictBlocksToFreeSpace( ? ? ?blockId: Option[BlockId], ? ? ?space: Long, ? ? ?memoryMode: MemoryMode): Long = {

其中的blockId是數據塊的id,每個id都對應一個內存塊。釋放內存塊的邏輯如下:

(1)遍歷內存塊的隊列。這是一個LinkedHashMap,最后一次被訪問的內存塊節點會放到鏈表的后面,這樣最近沒有被訪問的內存塊就在隊列的頭部。

(2)檢查內存塊是否可以被釋放。釋放內存塊需要滿足以下條件:

1)內存塊的模式必須和參數中memoryMode的值相等;

2)該blockId對應的內存塊沒有被其他RDD占用,或則不是要替換相同RDD的不同數據塊。

(3)若滿足以上兩個條件,就會釋放該內存塊。釋放內存塊的過程如下:

1)確認內存塊的寫鎖已經鎖上了;

2)通過blockId的信息檢查存儲級別是否包含useDisk,若包含則把內存的數據寫入到磁盤上。寫入磁盤 的過程是通過DiskStore對象來完成的。

(4)由于實際的內存是通過MemoryStore來管理的,所以,最后一步就是從memoryStore中刪除并釋放blockId對應的內存塊,并減少MemoryStore的內存數量。到此,就完成了內存釋放的整個過程。至于MemoryStore是如何釋放內存的,會在分析MemoryStore時進行分析。

計算可用堆內存儲內存:maxOnHeapStorageMemory函數

該函數用來計算堆內可用內存,邏輯很簡單:就是使用總的堆內存儲內存-為執行器可分配的堆內內存:

? override def maxOnHeapStorageMemory: Long = synchronized { // 計算可用堆內內存 ? maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed }
計算對外存儲內存:maxOffHeapStorageMemory函數

該函數用來計算可用堆外內存:使用總堆外內存-為執行器分配的堆外內存:

? override def maxOffHeapStorageMemory: Long = synchronized { // 計算可用堆外內存 ? maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed }

小結

本文講述了spark統一內存管理的實現原理。從實現層面來看,Spark的統一內存管理都是在UnifiedMemoryManager類中實現。不管是執行還是存儲內存不足時,都可以向對方借用內存。但內存不足時,可以根據LRU來釋放存儲正在使用的內存,但不能釋放執行時正在使用的內存。

另外,最終的內存塊釋放和數據塊的持久化是通過MemoryStore,DiskStore以及BlockManager這幾個系統來完成的,這些組件的原理會在后面的文章中繼續分析。

總結

以上是生活随笔為你收集整理的内存参数 计算_Spark统一内存管理的实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。