Spark 任务调度机制详解
Spark 任務(wù)調(diào)度機制
在工廠環(huán)境下,Spark 集群的部署方式一般為 YARN-Cluster 模式,之后的內(nèi)核分析內(nèi)容中我們默認(rèn)集群的部署方式為 YARN-Cluster 模式。
4.1 Spark 任務(wù)提交流程
在上一章中我們講解了 Spark YARN-Cluster 模式下的任務(wù)提交流程,如下圖所示:
下面的時序圖清晰地說明了一個 Spark 應(yīng)用程序從提交到運行的完整流程:
提交一個 Spark 應(yīng)用程序,首先通過 Client 向 ResourceManager 請求啟動一個Application,同時檢查是否有足夠的資源滿足 Application 的需求,如果資源條件滿足,則準(zhǔn)備 ApplicationMaster 的啟動上下文,交給 ResourceManager,并循環(huán)監(jiān)控Application 狀態(tài)。
當(dāng)提交的資源隊列中有資源時,ResourceManager 會在某個 NodeManager 上啟動 ApplicationMaster 進程,ApplicationMaster 會單獨啟動 Driver 后臺線程,當(dāng) Driver啟動后,ApplicationMaster 會通過本地的 RPC 連接 Driver,并開始向 ResourceManager申請 Container 資源運行 Executor 進程(一個 Executor 對應(yīng)與一個 Container),當(dāng)ResourceManager 返回Container 資源,ApplicationMaster 則在對應(yīng)的 Container 上啟動 Executor。
Driver 線程主要是初始化 SparkContext 對象,準(zhǔn)備運行所需的上下文,然后一方面保持與 ApplicationMaster 的 RPC 連接,通過 ApplicationMaster 申請資源,另一方面根據(jù)用戶業(yè)務(wù)邏輯開始調(diào)度任務(wù),將任務(wù)下發(fā)到已有的空閑 Executor 上。
當(dāng) ResourceManager 向 ApplicationMaster 返 回 Container 資源時,ApplicationMaster 就嘗試在對應(yīng)的 Container 上啟動 Executor 進程,Executor 進程起來后,會向 Driver 反向注冊,注冊成功后保持與 Driver 的心跳,同時等待 Driver分發(fā)任務(wù),當(dāng)分發(fā)的任務(wù)執(zhí)行完畢后,將任務(wù)狀態(tài)上報給 Driver。
從上述時序圖可知,Client 只負(fù)責(zé)提交 Application 并監(jiān)控 Application 的狀態(tài)。對于 Spark 的任務(wù)調(diào)度主要是集中在兩個方面: 資源申請和任務(wù)分發(fā),其主要是通過 ApplicationMaster、Driver 以及 Executor 之間來完成。
4.2 Spark 任務(wù)調(diào)度概述
當(dāng) Driver 起來后,Driver 則會根據(jù)用戶程序邏輯準(zhǔn)備任務(wù),并根據(jù) Executor 資源情況逐步分發(fā)任務(wù)。在詳細(xì)闡述任務(wù)調(diào)度前,首先說明下 Spark 里的幾個概念。
一個 Spark 應(yīng)用程序包括 Job、Stage 以及 Task 三個概念:
-
Job 是以 Action 方法為界,遇到一個 Action 方法則觸發(fā)一個 Job;
-
Stage 是 Job 的子集,以 RDD 寬依賴(即 Shuffle)為界,遇到 Shuffle 做一次劃分;
-
Task 是 Stage 的子集,以并行度(分區(qū)數(shù))來衡量,分區(qū)數(shù)是多少,則有多少個 task。
Spark 的任務(wù)調(diào)度總體來說分兩路進行,一路是 Stage 級的調(diào)度,一路是 Task級的調(diào)度,總體調(diào)度流程如下圖所示:
Spark RDD 通過其 Transactions 操作,形成了 RDD 血緣關(guān)系圖,即 DAG,最后通過 Action 的調(diào)用,觸發(fā) Job 并調(diào)度執(zhí)行。DAGScheduler 負(fù)責(zé) Stage 級的調(diào)度,主要是將 DAG 切分成若干 Stages,并將每個 Stage 打包成 TaskSet 交給 TaskScheduler調(diào)度。TaskScheduler 負(fù)責(zé) Task 級的調(diào)度,將 DAGScheduler 給過來的 TaskSet 按照指定的調(diào)度策略分發(fā)到 Executor 上執(zhí)行,調(diào)度過程中 SchedulerBackend 負(fù)責(zé)提供可用資源,其中 SchedulerBackend 有多種實現(xiàn),分別對接不同的資源管理系統(tǒng)。有了上述感性的認(rèn)識后,下面這張圖描述了 Spark-On-Yarn 模式下在任務(wù)調(diào)度期間,ApplicationMaster、Driver 以及 Executor 內(nèi)部模塊的交互過程:
Driver 初始化 SparkContext 過 程 中 , 會 分 別 初 始 化 DAGScheduler 、TaskScheduler、SchedulerBackend 以及 HeartbeatReceiver,并啟動 SchedulerBackend以及HeartbeatReceiver。SchedulerBackend 通過 ApplicationMaster 申請資源,并不斷從 TaskScheduler 中拿到合適的 Task 分發(fā)到 Executor 執(zhí)行。HeartbeatReceiver 負(fù)責(zé)接收 Executor 的心跳信息,監(jiān)控 Executor 的存活狀況,并通知到 TaskScheduler。
4.3 Spark Stage 級調(diào)度
Spark 的任務(wù)調(diào)度是從 DAG 切割開始,主要是由 DAGScheduler 來完成。當(dāng)遇到一個 Action 操作后就會觸發(fā)一個 Job 的計算,并交給 DAGScheduler 來提交,下圖是涉及到 Job 提交的相關(guān)方法調(diào)用流程圖。
Job 由 最 終 的 RDD 和 Action 方 法 封 裝 而 成 , SparkContext 將 Job 交 給DAGScheduler 提交,它會根據(jù) RDD 的血緣關(guān)系構(gòu)成的 DAG 進行切分,將一個 Job劃分為若干 Stages,具體劃分策略是,由最終的 RDD 不斷通過依賴回溯判斷父依賴是否是寬依賴,即以 Shuffle 為界,劃分 Stage,窄依賴的 RDD 之間被劃分到同一個Stage 中,可以進行 pipeline 式的計算,如上圖紫色流程部分。劃分的 Stages 分兩類,一類叫做 ResultStage,為 DAG 最下游的 Stage,由 Action 方法決定,另一類叫做ShuffleMapStage,為下游 Stage 準(zhǔn)備數(shù)據(jù),下面看一個簡單的例子 WordCount。
Job 由 saveAsTextFile 觸發(fā),該 Job 由 RDD-3 和 saveAsTextFile 方法組成,根據(jù)RDD 之間的依賴關(guān)系從 RDD-3 開始回溯搜索,直到?jīng)]有依賴的 RDD-0,在回溯搜索過程中,RDD-3 依賴 RDD-2,并且是寬依賴,所以在 RDD-2 和 RDD-3 之間劃分Stage,RDD-3 被劃到最后一個 Stage,即 ResultStage 中,RDD-2 依賴 RDD-1,RDD-1依賴 RDD-0,這些依賴都是窄依賴,所以將 RDD-0、RDD-1 和 RDD-2 劃分到同一個 Stage,即 ShuffleMapStage 中,實際執(zhí)行的時候,數(shù)據(jù)記錄會一氣呵成地執(zhí)行RDD-0 到 RDD-2 的轉(zhuǎn)化。不難看出,其本質(zhì)上是一個深度優(yōu)先搜索算法。
一個 Stage 是否被提交,需要判斷它的父 Stage 是否執(zhí)行,只有在父 Stage 執(zhí)行完畢才能提交當(dāng)前 Stage,如果一個 Stage 沒有父 Stage,那么從該 Stage 開始提交。Stage 提交時會將 Task 信息(分區(qū)信息以及方法等)序列化并被打包成 TaskSet 交給TaskScheduler,一個 Partition 對應(yīng)一個 Task,另一方面 TaskScheduler 會監(jiān)控 Stage的運行狀態(tài),只有 Executor 丟失或者 Task 由于 Fetch 失敗才需要重新提交失敗的Stage 以調(diào)度運行失敗的任務(wù),其他類型的 Task 失敗會在 TaskScheduler 的調(diào)度過程中重試。
相對來說 DAGScheduler 做的事情較為簡單,僅僅是在 Stage 層面上劃分 DAG,提交 Stage 并監(jiān)控相關(guān)狀態(tài)信息。TaskScheduler 則相對較為復(fù)雜,下面詳細(xì)闡述其細(xì)節(jié)。
4.4 Spark Task 級調(diào)度
Spark Task 的調(diào)度是由 TaskScheduler 來完成,由前文可知,DAGScheduler 將Stage 打 包 到 TaskSet 交 給 TaskScheduler, TaskScheduler 會 將 TaskSet 封裝為TaskSetManager 加入到調(diào)度隊列中,TaskSetManager 結(jié)構(gòu)如下圖所示。
TaskSetManager 負(fù)責(zé)監(jiān)控管理同一個 Stage 中的 Tasks,TaskScheduler 就是以TaskSetManager 為單元來調(diào)度任務(wù)。前面也提到,TaskScheduler 初始化后會啟動 SchedulerBackend,它負(fù)責(zé)跟外界打交道,接收 Executor 的注冊信息,并維護 Executor 的狀態(tài),所以說 SchedulerBackend是管“糧食”的,同時它在啟動后會定期地去“詢問”TaskScheduler 有沒有任務(wù)要運行,也就是說, 它會定期地 “ 問 ”TaskScheduler“ 我有這么余量,你 要不要啊 ” ,TaskScheduler 在 SchedulerBackend“問”它的時候,會從調(diào)度隊列中按照指定的調(diào)度策略選擇 TaskSetManager 去調(diào)度運行,大致方法調(diào)用流程如下圖所示:
圖 3-7 中,將 TaskSetManager 加入 rootPool 調(diào)度池中之后,調(diào)用 SchedulerBackend的 riviveOffers 方法給 driverEndpoint 發(fā)送 ReviveOffer 消息;driverEndpoint 收到ReviveOffer 消息后調(diào)用 makeOffers 方法,過濾出活躍狀態(tài)的 Executor(這些 Executor都是任務(wù)啟動時反向注冊到 Driver 的 Executor),然后將 Executor 封裝成 WorkerOffer對 象 ; 準(zhǔn) 備 好 計 算 資 源 (WorkerOffer ) 后 , taskScheduler 基 于 這 些 資 源 調(diào) 用resourceOffer 在 Executor 上分配 task。
4.4.1 調(diào)度策略
前 面 講 到 , TaskScheduler 會 先 把 DAGScheduler 給 過 來 的 TaskSet 封裝成TaskSetManager 扔到任務(wù)隊列里,然后再從任務(wù)隊列里按照一定的規(guī)則把它們?nèi)〕鰜碓?SchedulerBackend 給過來的 Executor 上運行。這個調(diào)度過程實際上還是比較粗粒度的,是面向 TaskSetManager 的。
TaskScheduler 是以樹的方式來管理任務(wù)隊列,樹中的節(jié)點類型為 Schdulable,葉子節(jié)點為 TaskSetManager,非葉子節(jié)點為 Pool,下圖是它們之間的繼承關(guān)系。
TaskScheduler 支持兩種調(diào)度策略,一種是 FIFO,也是默認(rèn)的調(diào)度策略,另一種是 FAIR。在 TaskScheduler 初始化過程中會實例化 rootPool,表示樹的根節(jié)點,是Pool 類型。
1. FIFO 調(diào)度策略
如果是采用 FIFO 調(diào)度策略,則直接簡單地將 TaskSetManager 按照先來先到的方式入隊,出隊時直接拿出最先進隊的 TaskSetManager,其樹結(jié)構(gòu)如下圖所示,TaskSetManager 保存在一個 FIFO 隊列中。
2. FAIR 調(diào)度策略
FAIR 調(diào)度策略的樹結(jié)構(gòu)如下圖所示:
FAIR 模式中有一個 rootPool 和多個子 Pool,各個子 Pool 中存儲著所有待分配的 TaskSetMagager。
在 FAIR 模 式 中 , 需 要 先 對 子 Pool 進 行 排 序 , 再 對 子 Pool 里 面 的TaskSetMagager 進行排序,因為 Pool 和 TaskSetMagager 都繼承了 Schedulable 特質(zhì),因此使用相同的排序算法。
排序過程的比較是基于 Fair-share 來比較的,每個要排序的對象包含三個屬性:runningTasks 值(正在運行的 Task 數(shù))、minShare 值、weight 值,比較時會綜合考量 runningTasks 值,minShare 值以及 weight 值。
注意,minShare、weight 的值均在公平調(diào)度配置文件 fairscheduler.xml 中被指定,調(diào)度池在構(gòu)建階段會讀取此文件的相關(guān)配置。
1) 如果 A 對象的 runningTasks 大于它的 minShare,B 對象的 runningTasks 小于它的 minShare,那么 B 排在 A 前面;(runningTasks 比 minShare 小的先執(zhí)行)
2) 如果 A、B 對象的 runningTasks 都小于它們的 minShare,那么就比較runningTasks 與 minShare 的比值(minShare 使用率),誰小誰排前面;(minShare使用率低的先執(zhí)行)
3) 如果 A、B 對象的 runningTasks 都大于它們的 minShare,那么就比較runningTasks 與 weight 的比值(權(quán)重使用率),誰小誰排前面。(權(quán)重使用率低的先執(zhí)行)
4) 如果上述比較均相等,則比較名字。整體上來說就是通過 minShare 和 weight 這兩個參數(shù)控制比較過程,可以做到讓 minShare 使用率和權(quán)重使用率少(實際運行 task 比例較少)的先運行。
FAIR 模式排序完成后,所有的 TaskSetManager 被放入一個 ArrayBuffer 里,之后依次被取出并發(fā)送給 Executor 執(zhí)行。
從調(diào)度隊列中拿到 TaskSetManager 后,由于 TaskSetManager 封裝了一個 Stage的所有 Task,并負(fù)責(zé)管理調(diào)度這些 Task,那么接下來的工作就是 TaskSetManager按照一定的規(guī)則一個個取出 Task 給 TaskScheduler , TaskScheduler 再交給SchedulerBackend 去發(fā)到 Executor 上執(zhí)行。
4.4.2 本地化調(diào)度
DAGScheduler 切割 Job,劃分 Stage, 通過調(diào)用 submitStage 來提交一個 Stage對應(yīng)的 tasks,submitStage 會調(diào)用 submitMissingTasks,submitMissingTasks 確定每個需要計算的 task 的 preferredLocations,通過調(diào)用 getPreferrdeLocations()得到partition 的優(yōu)先位置,由于一個 partition 對應(yīng)一個 task,此 partition 的優(yōu)先位置就是 task 的優(yōu)先位置,對于要提交到TaskScheduler 的 TaskSet 中的每一個 task,該 task優(yōu)先位置與其對應(yīng)的 partition 對應(yīng)的優(yōu)先位置一致。
從調(diào)度隊列中拿到 TaskSetManager 后,那么接下來的工作就是 TaskSetManager 按照一定的規(guī)則一個個取出 task 給 TaskScheduler,TaskScheduler 再交給 SchedulerBackend 去發(fā)到Executor 上執(zhí)行。前面也提到,TaskSetManager 封裝了一個 Stage 的所有 task,并負(fù)責(zé)管理調(diào)度這些 task。
根據(jù)每個 task 的優(yōu)先位置,確定 task 的 Locality 級別,Locality 一共有五種,優(yōu)先級由高到低順序:
在調(diào)度執(zhí)行時,Spark 調(diào)度總是會盡量讓每個 task 以最高的本地性級別來啟動,當(dāng)一個 task 以 X 本地性級別啟動,但是該本地性級別對應(yīng)的所有節(jié)點都沒有空閑資源而啟動失敗,此時并不會馬上降低本地性級別啟動而是在某個時間長度內(nèi)再次以X 本地性級別來啟動該 task,若超過限時時間則降級啟動,去嘗試下一個本地性級別,依次類推。
可以通過調(diào)大每個類別的最大容忍延遲時間,在等待階段對應(yīng)的 Executor 可能就會有相應(yīng)的資源去執(zhí)行此 task,這就在在一定程度上提到了運行性能。
4.4.3 失敗重試與黑名單機制
除了選擇合適的 Task 調(diào)度運行外,還需要監(jiān)控 Task 的執(zhí)行狀態(tài),前面也提到,與外部打交道的是 SchedulerBackend,Task 被提交到 Executor 啟動執(zhí)行后,Executor會將執(zhí)行狀態(tài)上報給 SchedulerBackend,SchedulerBackend 則告訴 TaskScheduler,TaskScheduler 找到該 Task 對應(yīng)的 TaskSetManager,并通知到該 TaskSetManager,這樣 TaskSetManager 就知道 Task 的失敗與成功狀態(tài),對于失敗的 Task,會記錄它失敗的次數(shù),如果失敗次數(shù)還沒有超過最大重試次數(shù),那么就把它放回待調(diào)度的 Task池子中,否則整個 Application 失敗。
在記錄 Task 失敗次數(shù)過程中,會記錄它上一次失敗所在的 Executor Id 和 Host,這樣下次再調(diào)度這個 Task 時,會使用黑名單機制,避免它被調(diào)度到上一次失敗的節(jié)點上,起到一定的容錯作用。黑名單記錄 Task 上一次失敗所在的 Executor Id 和 Host,以及其對應(yīng)的“拉黑”時間,“拉黑”時間是指這段時間內(nèi)不要再往這個節(jié)點上調(diào)度這個 Task 了。
參考鏈接:https://www.cnblogs.com/LXL616/p/11165826.html
總結(jié)
以上是生活随笔為你收集整理的Spark 任务调度机制详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VMware vCenter Conve
- 下一篇: 【收藏】mydockfinder下载地址