日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark详解(六):Spark集群资源调度算法原理

發布時間:2025/4/16 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark详解(六):Spark集群资源调度算法原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 應用程序之間

在Standalone模式下,Master提供里資源管理調度功能。在調度過程中,Master先啟動等待列表中應用程序的Driver,這個Driver盡可能分散在集群的Worker節點上,然后根據集群的內存和CPU使用情況,對等待運行的應用程序進行資源分配。默認分配規則是有條件的FIFO,先分配的應用程序會盡可能多的獲取滿足條件的資源,后分配的應用程序只能在剩余資源中再次篩選。如果沒有合適資源的應用程序只能等待。Master.scheduler方法如下:

private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// 隨機打亂Worker節點val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0// 這是只對Standalone下的Cluster模式才生效,client模式Driver是在客戶端for (driver <- waitingDrivers.toList) { var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}// 對等待應用程序按照順序分配運行資源startExecutorsOnWorkers()}

默認情況下,在Standalone模式下,每個應用程序可以分配到的CPU核數可以由spark.deploy.defaultCores進行設置,但是該配置默認為Int.max,也就是不限制,從而應用程序會盡可能獲取CPU資源。為了限制每個應用程序使用CPU資源,用戶一方面可以設置spark.core.max配置項,約束每個應用程序所能申請的最大CPU核數;另一方面可以設置spark.executor.cores配置項,用于設置在每個Executor上啟動的CPU核數。

/*** Schedule and launch executors on workers*/private def startExecutorsOnWorkers(): Unit = {// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app// in the queue, then the second app, etc.// 使用FIFO算法運行應用,即先注冊的應用先運行for (app <- waitingApps if app.coresLeft > 0) {val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor// Filter out workers that don't have enough resources to launch an executorval usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor.getOrElse(1)).sortBy(_.coresFree).reverse// 一種是spreadOutApps,就是把應用運行在盡量多的Worker上,另一種是非spreadOutAppsval assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate them// 給每個worker分配完application要求的cpu core之后,遍歷worker啟動executorfor (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))}}}

對于Worker的分配策略有兩種:一種是盡量把應用程序運行可能多的Worker上,這種分配算法不僅能充分利用集群資源,還有利于數據本地性;另一種就是應用程序運行在盡量少的Worker上,這種適用于CPU密集型而內存使用較少的場景。配置項為spark.deploy.spreadOut。主要代碼為:Master.scheduleExecutorsOnWorkers方法實現。

private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {// 應用程序中每個Executor所需要的CPU核數val coresPerExecutor = app.desc.coresPerExecutor// 每個Executor所需的最少核數,如果設置了coresPerExecutor則為該值,否則為1val minCoresPerExecutor = coresPerExecutor.getOrElse(1)// 如果沒有設置coresPerExecutor,那么每個Worker上只有一個Executor,并盡可能分配資源val oneExecutorPerWorker = coresPerExecutor.isEmpty// 每個Executor需要分配多少內存val memoryPerExecutor = app.desc.memoryPerExecutorMB// 集群中可用的Worker節點的數量val numUsable = usableWorkers.length// Worker節點所能提供的CPU核數數組val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker// Worker分配Executor個數數組val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker// 需要分配的CPU核數,為應用程序所需CPU核數和可用CPU核數最小值var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)/** Return whether the specified worker can launch an executor for this app. *//*** 返回指定的Worker節點是否能夠啟動Executor,滿足條件:* 1. 應用程序需要分配CPU核數>=每個Executor所需的最少CPU核數* 2. 是否有足夠的CPU核數,判斷條件為該Worker節點可用CPU核數-該Worker節點已分配的CPU核數>=每個Executor所需最少CPU核數* 如果在該Worker節點上允許啟動新的Executor,需要追加以下兩個條件:* 1. 判斷內存是否足夠啟動Executor,其方法是:當前Worker節點可用內存-該Worker已分配的內存>=每個Executor分配的內存大小* 2. 已經分配給該應用程序的Executor數量+已經運行該應用程序的Executor數量<該應用程序Executor設置的最大值*/def canLaunchExecutor(pos: Int): Boolean = {val keepScheduling = coresToAssign >= minCoresPerExecutorval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor// If we allow multiple executors per worker, then we can always launch new executors.// Otherwise, if there is already an executor on this worker, just give it more cores.// 啟動新Executor條件是:該Worker節點允許啟動多個Executor或者在該Worker節點上沒有為該應用程序分配Executorval launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0// 如果在該Worker節點上允許啟動多個Executor,那么該Executor節點滿足啟動條件就可以啟動新Executor,// 否則只能啟動一個Executor并盡可能的多分配CPU核數if (launchingNewExecutor) {val assignedMemory = assignedExecutors(pos) * memoryPerExecutorval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutorval underLimit = assignedExecutors.sum + app.executors.size < app.executorLimitkeepScheduling && enoughCores && enoughMemory && underLimit} else {// We're adding cores to an existing executor, so no need// to check memory and executor limitskeepScheduling && enoughCores}}// Keep launching executors until no more workers can accommodate any// more executors, or if we have reached this application's limitsvar freeWorkers = (0 until numUsable).filter(canLaunchExecutor)// 在可用的Worker節點中啟動Executor,在Worker節點每次分配資源時,分配給Executor所需的最少CPU核數,該過程是通過多次輪詢進行,// 直到沒有Worker節點滿足啟動Executor條件活著已經達到應用程序限制。在分配過程中Worker節點可能多次分配,// 如果該Worker節點可以啟動多個Executor,則每次分配的時候啟動新的Executor并賦予資源;// 如果該Worker節點只能啟動一個Executor,則每次分配的時候把資源追加到該Executorwhile (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = true// 滿足 keepScheduling標志為真(第一次分配或者集中運行)和該Worker節點滿足// 啟動Executor條件時,進行資源分配while (keepScheduling && canLaunchExecutor(pos)) {// 每次分配CPU核數為Executor所需的最少CPU核數coresToAssign -= minCoresPerExecutorassignedCores(pos) += minCoresPerExecutor// If we are launching one executor per worker, then every iteration assigns 1 core// to the executor. Otherwise, every iteration assigns cores to a new executor.// 如果設置每個Executor啟動CPU核數,則該Worker只能為該應用程序啟動1個Executor,// 否則在每次分配中啟動1個新的Executorif (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}// Spreading out an application means spreading out its executors across as// many workers as possible. If we are not spreading out, then we should keep// scheduling executors on this worker until we use all of its resources.// Otherwise, just move on to the next worker.// 如果是分散運行,則在某一Worker節點上做完資源分配立即移到下一個Worker節點,// 如果是集中運行,則持續在某一Worker節點上做資源分配,知道使用完該Worker節點所有資源。// 傳入的Worker節點列表是按照CPU核數倒序排列,在集中運行時,會盡可能少的使用Worker節點if (spreadOutApps) {keepScheduling = false}}}// 繼續從上一次分配完的可用Worker節點列表獲取滿足啟動Executor的Worker節點列表freeWorkers = freeWorkers.filter(canLaunchExecutor)}// 返回每個Worker節點分配的CPU核數assignedCores}

tips:
關于branch-2.0,這個算法是在Spark 1.4.2的版本中優化的。在以前,Worker節點中,只能為某應用程序啟動一個Executor。輪詢分配資源時,Worker節點每次分配1個CPU核數,這樣有可能會造成某個Worker節點最終分配CPU核數小于每個Executor所需CPU核數,那么該節點將不啟動該Executor。例如:
在集群中有4個Worker節點,每個節點擁有16個CPU核數,其中設置了spark.cores.max=48和spark.executor.cores=16,由于每個Worker只啟動一個Executor,按照每次分配一個CPU核數,則每個Worker節點的Executor將分配到12個CPU核數,每個由于12<16, 所以沒有Executor能啟動。現在改進的算法是,如果設置了spark.executor.cores,那么每次分配的時候就分配這個指定的CPU核數

2. 作業以及調度階段之間

Spark應用程序提交執行時,會根據RDD依賴關系形成有向無環圖(DAG),然后交給DAGScheduler進行劃分作業和調度階段。這些作業之間沒有任何依賴關系,對于多個作業之間的調度,Spark目前提供了兩種不同的調度策略,一種是FIFO模式,這也是目前默認的模式;還有一種是FAIR模式,該模式的調度可以通過兩個參數來決定Job執行的優先模式,兩個參數分別是minShare(最小任務數)和weight(任務的權重)。

2.1 創建調度池

在TaskSchedulerImpl.initialize方法中先創建根調度池rootPool對象,然后根據系統配置的調度模式創建調度創建器,針對兩種調度策略進行具體實例化FIFOSchedulableBuilder或者FairSchedulableBuilder,最終使用調度器創建buildPools方法根調度池rootPool下創建調度池。

def initialize(backend: SchedulerBackend) {this.backend = backend// temporarily set rootPool name to emptyrootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)case _ =>throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")}}schedulableBuilder.buildPools()}

2. 調度池中加入調度內容

在TaskSchedulerImpl.submitTask方法中,先把調度階段拆分為任務集,然后把這些任務集交給管理器TaskSetManager進行管理,最后把該任務集的管理器加入到調度池中,等待分配執行。在FIFO中,由于創建的buildPools方法為空,所以在根調度器rootPool中并沒有下級調度池,而是直接包含了一組TaskSetManager;而在Fair調度器中,根調度池rootPool中包含了下級調度池Pool,在這些下級調度池Pool包含一組TaskSetManager。

override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {// 為每一個taskSet創建一個taskSetManager// taskSetManager在后面負責,TaskSet的任務執行狀況的監視和管理val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])// 把manager加入內存緩存中stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}// 將該任務集的管理器加入到系統調度池中,由系統統一調配,該調度器屬于應用級別// 支持FIFO和FAIR兩種,默認FIFOschedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

3. 提供已排序的任務集管理器

在TaskSchedulerImpl.resourceOffers方法中進行資源分配時,會從根調度池rootPool獲取以及排序的任務管理器,排序算法由兩種調度策略提供。

// 獲取按照調度策略排序好的TaskSetManager// 從rootPool中取出排序了的TaskSetManager// 在創建完TaskScheduler StandaloneSchedulerBackend之后,會執行initialize()方法,其實會創建一個調度池// 這里就是所有提交的TaskSetManager,首先會放入這個調度池中,然后再執行task分配算法的時候,會從這個調度池中,取出排好隊的TaskSetManagerval sortedTaskSets = rootPool.getSortedTaskSetQueue

在FIFO調度策略中,由于根調度池rootPool直接包含了多個作業的任務管理器,在比較時,首先需要比較作業的優先級(根據作業編號判斷,作業編號越小優先級越高),如果是同一個作業,則會比較調度階段優先級(根據調度階段編號判斷,調度階段編號越小優先級越高)

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {//獲取作業優先級,實際上是作業編號val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)//如果是同一個作業,再比較調度階段優先級if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}res < 0} }

在FAIR調度策略中包含了兩層調度,第一層調度池rootPool中包含了下級調度池Pool,第二層為下級調度池Pool包含多個TaskSetManager。具體配置參見$SPARK_HOME/conf/fairscheduler.xml文件,在該文件中包含多個下級調度池Pool配置項,其中minShare(最小任務數)和weight(任務的權重)用來設置第一級調度算法,而SchedulingMode參數是用來設置第二層調度算法。

在FAIR算法中,先獲取兩個調度器的饑餓程度,饑餓程度為正在運行的任務是否小于最小任務,如果是,則表示該調度處于饑餓程度。獲取饑餓程度后進行如下比較:

  • 如果某個調度處于解狀態,另一個處于非饑餓狀態,則先滿足處于饑餓狀態的調度
  • 如果兩個調度都處于饑餓狀態,則比較資源比,先們在這資源比較少的調度。
  • 如果兩個調度處于非饑餓狀態,則比較權重比,先滿足權重比少的調度。
  • 以上情況均相同的情況,根據調度的名稱進行排序。
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {//最小任務數val minShare1 = s1.minShareval minShare2 = s2.minShare//正在運行的任務數val runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasks//饑餓程序,判斷標準為正在運行的任務數量是否小于最小任務數量val s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2//資源比,正在運行的任務數量/最小任務數量val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)//權重比,正在運行的任務數/任務的權重val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble//判斷執行var compare = 0if (s1Needy && !s2Needy) {return true} else if (!s1Needy && s2Needy) {return false} else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}} }

3. 任務之間

我們先了解數據本地型和延遲兩個概念。

3.1 數據本地型

數據的計算盡可能在數據所在的節點上進行,這樣可以減少數據在網絡上的傳輸,畢竟移動計算比移動數據代價來得小些。進一步看,數據如果在運行的節點的內存中,就能夠進一步減少磁盤I/O的傳輸。在Spark中數據本地型優先級從高到低為PROCESS_LOCAL>NODE_LOCAL>NO_PREF>PACK_LOCAL>ANY,即最好是任務運行的節點內存中存在數據,次好是同一個Node(同一個機器)上,再次是同機架,最后是同一個為。其中任務數據本地型通過以下情況來確定:

  • 如果任務處于作業的開始的調度階段,這些任務對于的RDD分區都有首選運行位置,該位置也是任務運行的首選位置,數據本地性為NODE_LOCAL.
  • 如果任務處于非作業開頭的調度階段,可以根據父調度階段運行的位置得到任務的首選位置,這種情況下,如果Executor處于活躍狀態,則數據本地性為PROCESS_LOCAL;如果Executor不處于活動狀態,但存在父調度階段運行結果,則數據本地性為NODE_LOCAL
  • 如果沒有首選位置,則數據本地型為NO_PREF

3.2 延遲執行

在任務分配運行節點時先判斷最佳運行節點是否空閑,如果該節點沒有足夠的資源運行該任務,在這種情況下任務會等待一定的時間;如果在等待時間內該節點釋放足夠的資源,則任務在該節點運行,如果還不足會找出次佳的節點運行。通過這樣的方式進行能夠讓任務運行在更高級別的數據本地性節點,從而減少磁盤I/O和網絡傳輸。一般來說PROCESS_LOCAL和NODE_LOCAL兩個數據本地性級別進行等待,系統默認延遲時間為3s。

Spakr任務分配的原則就是讓任務運行在數據本地性優先級別更高的節點上,甚至可以為此等待一定的時間。該任務分配有TaskSchedulerImpl.resourceOffers方法實現,在該方法中先對應于程序獲取的資源(Worker節點)進行混洗,以使任務能夠更加均衡的分散在集群中運行。然后對任務集對應的TaskManager根據設置地調度算法進行排序,最后對TaskSetManager中的任務按照數據本地性分配任務運行節點,在分配時先根據任務集的本地性從優先級高到低進行分配任務,在分配的過程中動態地判斷集群中節點運行的請,通過延遲執行等待數據本地性更高的節點運行。

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的Spark详解(六):Spark集群资源调度算法原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。