日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark 存储机制详解

發(fā)布時間:2024/4/14 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 存储机制详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

  我們知道spark可以將運行過的RDD存儲到內(nèi)存上, 并在需要的時候重復利用. 那么spark是怎么完成這些工作的, 本文將通過分析源碼來解釋RDD的重復利用過程.

  在上一篇文章解釋了spark的執(zhí)行機制, DAGScheduler負責分解action, 在DAGScheduler.getMissingParentStages中, spark首次利用了過去的RDD, 而所使用的函數(shù)就是DAGScheduler.getCacheLocs.

1 private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] 2 3 private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { 4 if (!cacheLocs.contains(rdd.id)) { 5 val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] 6 val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) 7 cacheLocs(rdd.id) = blockIds.map { id => 8 locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) 9 } 10 } 11 cacheLocs(rdd.id) 12 }

  DAGScheduler只在cacheLocs存儲部分partiton的位置信息. 我們來看看未cache的執(zhí)行邏輯, 首先生成代表每個partition的blocksIds, 然后調(diào)用BlockManager.blockIdsToBlockManagers把blocksId轉換成Seq[blockManagerId], 而blockManagersId包含了partition的的位置信息(每個partition按一個block存放, block也可以存放broadcast等數(shù)據(jù)).

  根據(jù)注釋, 每個節(jié)點上(包括master 和 worker)都運行了BlockManager來管理所有的存儲信息(包括RDD和broadcast等等), ?master與worker通過Akka Actor系統(tǒng)(可以看我的另外一篇文章來入門)交流, 即BlockManagerMasterActor 與 BlockManagerSlaveActor. 繼續(xù)看BlockManager.blockIdsToBlockManagers.

1 def blockIdsToBlockManagers( 2 blockIds: Array[BlockId], 3 env: SparkEnv, 4 blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = { 5 6 // blockManagerMaster != null is used in tests 7 assert(env != null || blockManagerMaster != null) 8 val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { 9 env.blockManager.getLocationBlockIds(blockIds) 10 } else { 11 blockManagerMaster.getLocations(blockIds) 12 } 13 14 val blockManagers = new HashMap[BlockId, Seq[BlockManagerId]] 15 for (i <- 0 until blockIds.length) { 16 blockManagers(blockIds(i)) = blockLocations(i) 17 } 18 blockManagers.toMap 19 }

  blockManager實在SparkEnv中被創(chuàng)建的, SparkEnv同樣運行在所有節(jié)點上, 并在創(chuàng)建時區(qū)分為DriverEnv, 和 executorEnv(同一個類, 但是元素不一樣, 和blockManager一樣), 在創(chuàng)建sparkEnv時, 會為driver上的blockManager創(chuàng)建一個blockManagerMasterActor, 為executor上的blockManager給一個blockManagerMasterActor的Ref. 上面代碼使用sparkEnv.blockManager.blockManagerMaster.getLocations來求出各個blockId的BlockManagerId, 并組織成Map的形式返回.接下來來到blockManager.getLocations.

1 def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 2 askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId)) 3 } 4 private def askDriverWithReply[T](message: Any): T = { 5 AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, AKKA_RETRY_INTERVAL_MS, 6 timeout) 7 }

  這段代碼就是簡單的將GetLocations的message發(fā)送給BlockManagerMasterActor, 并等待回復. BlockManagerMasterActor保存了所有關于存儲的信息, blockManagerInfo有所有executor的存儲信息, blockManagerIdByExecutor從executor到executor上的blockManagerId的映射, blockLocations保存了所有的block的所有存儲位置(包含所有的partition的位置), 一下是blockManagerMasterActor的關于查詢存儲的位置:

1 override def receiveWithLogging = { 2 case GetLocations(blockId) => 3 sender ! getLocations(blockId) 4 case ... => 5 } 6 7 private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 8 if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty 9 }

  由于BlockManagerMasterActor保存了所有Block的位置, 所以只是簡單的給出答案. 到現(xiàn)在可以看出所有Block的位置信息都是保存在Master節(jié)點上的. 以上是關于spark 查找persist RDD所需要的完整步驟, 可是沒能覆蓋整個spark存儲機制, 接下來要分析一些其他的代碼.

  由于Block所有信息都存放在master上, 所有單單徐聞Block位置達不到和executor交互的目的, 我們分析一下RDD.unpersistRDD, 他調(diào)用sparkContext.unpersistRDD, 再繼續(xù)掉用env.blockManager.master.removeRdd:

1 def removeBlock(blockId: BlockId) { 2 askDriverWithReply(RemoveBlock(blockId)) 3 } 4 5 private def askDriverWithReply[T](message: Any): T = { 6 AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, AKKA_RETRY_INTERVAL_MS, 7 timeout) 8 }

  跟上面的例子一樣的, 發(fā)送RemoveBlock的消息到BlockManagerMasterActor.

1 override def receiveWithLogging = { 2 case RemoveBlock(blockId) => 3 removeBlockFromWorkers(blockId) 4 sender ! true 5 case ... => 6 } 7 8 private def removeBlockFromWorkers(blockId: BlockId) { 9 val locations = blockLocations.get(blockId) 10 if (locations != null) { 11 locations.foreach { blockManagerId: BlockManagerId => 12 val blockManager = blockManagerInfo.get(blockManagerId) 13 if (blockManager.isDefined) { 14 // Remove the block from the slave's BlockManager. 15 // Doesn't actually wait for a confirmation and the message might get lost. 16 // If message loss becomes frequent, we should add retry logic here. 17 blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout) 18 } 19 } 20 } 21 }

  這段代碼先通過blockLocations求出block的所有位置的BlockManagerId, 然后通過blockManagerId求出blockManagerInfo從而給出executor上的BlockManagerSlaveActor, 然后發(fā)送RemoveBlock的消息.

1 override def receiveWithLogging = { 2 case RemoveBlock(blockId) => 3 doAsync[Boolean]("removing block " + blockId, sender) { 4 blockManager.removeBlock(blockId) 5 true 6 } 7 case ... => 8 }

  BlockManagerSlaveActor收到消息后調(diào)用blockManager.removeBlock.

1 def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { 2 logInfo(s"Removing block $blockId") 3 val info = blockInfo.get(blockId).orNull 4 if (info != null) { 5 info.synchronized { 6 // Removals are idempotent in disk store and memory store. At worst, we get a warning. 7 val removedFromMemory = memoryStore.remove(blockId) 8 val removedFromDisk = diskStore.remove(blockId) 9 val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false 10 if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { 11 logWarning(s"Block $blockId could not be removed as it was not found in either " + 12 "the disk, memory, or tachyon store") 13 } 14 blockInfo.remove(blockId) 15 if (tellMaster && info.tellMaster) { 16 val status = getCurrentBlockStatus(blockId, info) 17 reportBlockStatus(blockId, info, status) 18 } 19 } 20 } else { 21 // The block has already been removed; do nothing. 22 logWarning(s"Asked to remove block $blockId, which does not exist") 23 } 24 }

?  這段代碼調(diào)用3個Store的remove函數(shù)來完成任務, 并按要求反饋結果.整個spark的存儲管理機制就到這里了.

?

轉載于:https://www.cnblogs.com/OddLearner/p/4181300.html

超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

總結

以上是生活随笔為你收集整理的spark 存储机制详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。