Task执行流程
1、源碼走讀
(1)當(dāng)Driver中的SchedulerBackend(Standalone模式為CoarseGrainedSchedulerBackend)給ExecutorBackend(Standalone模式為CoarseGrainedExecutorBackend)發(fā)送LaunchTask之后,CoarseGrainedExecutorBackend在收到LaunchTask消息后,Executor會(huì)通過TaskRunner在ThreadPool來運(yùn)行具體的Task,TaskRunner內(nèi)部會(huì)做一些準(zhǔn)備工作:例如反序列化Task,然后通過網(wǎng)絡(luò)來獲取需要的文件,jar等。
CoarseGrainedExecutorBackend在收到LaunchTask消息后首先看下Executor存不存在,不存在則系統(tǒng)直接退出,然后反序列化我們的任務(wù),反序列的是TaskDescription,反序列化之后就launchTask。
注意:在執(zhí)行具體Task的業(yè)務(wù)邏輯前會(huì)進(jìn)行4次反序列化:①TaskDescription的反序列化②Task的反序列化③RDD的反序列化④反序列化task的依賴
(2)進(jìn)入到executor的launchTask方法中,在這內(nèi)部有個(gè)TaskRunner
TaskRunner本身是個(gè)Runnable接口,在這個(gè)內(nèi)部要運(yùn)行核心肯定是run方法,在run方法里面定義一些屬性:taskMemoryManager內(nèi)存管理,deserializeStartTime反序列開始時(shí)間等。因?yàn)橐虞d具體的類所以要ClassLoaderThread.currentThread.?setContextClassLoader(replClassLoader)。調(diào)用execBackend.statusUpdate(taskId, ?TaskState.RUNNING,??EMPTY_BYTE_BUFFER),他是ExecutorBackend的方法,ExecutorBackend通過statusUpdate方法其實(shí)是給driver發(fā)信息匯報(bào)自己狀態(tài)的。有了(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)這些狀態(tài),告訴driver說現(xiàn)在任務(wù)開始運(yùn)行了。在然后這個(gè)反序列化得到的tasktuple包括任務(wù)運(yùn)行的文件,jars,Bytes等等都會(huì)被反序列過來
進(jìn)入到updateDependencies方法中。這里下載我們task運(yùn)行需要的所有依賴文件,獲得hadoop的Configuration。這里下載文件用了synchronized關(guān)鍵字,因?yàn)槲覀兠恳粋€(gè)taskrunner運(yùn)行在線程中,這個(gè)方法會(huì)被多個(gè)線程調(diào)用,方法在全局中,所以需要加鎖。下載文件,所有的依賴都有之后就反序列化任務(wù)即task本身。
回到run方法中,接下來判斷任務(wù)是否被killed
判斷任務(wù)是否被killed
再然后,調(diào)用反序列化后的task.run獲得執(zhí)行結(jié)果
task.run調(diào)用的時(shí)候轉(zhuǎn)過來會(huì)調(diào)用runTask方法,runTask是task里面的抽象方法,task主要有兩種類型:ShuffleMapTask和ResultTask。
①下面的ShuffleMapTask的runTask,實(shí)際內(nèi)部執(zhí)行的時(shí)候會(huì)通過調(diào)用rdd的iterator針對(duì)我們Partition進(jìn)行計(jì)算。我們的ShuffleMapTask在計(jì)算具體的Partition之后會(huì)通過ShuffleManager獲得ShuffleWrite把當(dāng)前task計(jì)算的結(jié)果根據(jù)具體的ShuffleManager的實(shí)現(xiàn)來寫入到具體的文件,操作完成后會(huì)把MapStatus發(fā)送給DAGScheduler。把MapStatus向DAGScheduler里面的MapOutputTracker匯報(bào)。
在rdd的iterator方法中,ShuffleMapTask先看一下cache memory中有沒有曾經(jīng)的數(shù)據(jù)
最終計(jì)算的時(shí)候就是調(diào)用這個(gè)rdd的compute,這里有個(gè)TaskContext類型的參數(shù)這里面維持了很多上下文信息
看個(gè)具體的RDD的compute實(shí)現(xiàn),任何的RDD的compute返回的都是Iterator
這個(gè)f是個(gè)函數(shù)一般是自己寫的,對(duì)他分配處理的業(yè)務(wù)邏輯,因?yàn)橛行㏑DD是系統(tǒng)自動(dòng)生成的所以可能是系統(tǒng)調(diào)用的邏輯。這個(gè)就是自己寫的業(yè)務(wù)邏輯了,只不過一個(gè)Stage從后往前推他會(huì)把所有的RDD合并最后變成一個(gè),函數(shù)鏈條也會(huì)展開成一個(gè)很大的函數(shù)
回到runtask中,這個(gè)writer要看不同的Shuffle
②ResultTask是根據(jù)前面Stage的執(zhí)行結(jié)果進(jìn)行Shuffle產(chǎn)生整個(gè)job最后的結(jié)果。這個(gè)是ResultTask,在反序列化RDD的時(shí)候直接去調(diào)func。
(3)回到run方法中,這個(gè)序列化的value是前面task.run獲得的執(zhí)行結(jié)果,之所以記錄這么多時(shí)間是為了在web控制臺(tái)可以看到這些信息
(4)在Executor的run方法的task執(zhí)行完之后會(huì)調(diào)用CoarseGrainedExecutorBackend的statusUpdate。其實(shí)就是給我們的driver發(fā)一個(gè)信息。
(5)CoarseGrainedSchedulerBackend收到statusUpdate消息后,它會(huì)調(diào)用Scheduler.statusUpdate,會(huì)釋放相關(guān)的資源,如果沒有什么問題的話空閑資源中就加上曾經(jīng)想消耗的東西,再次進(jìn)行資源調(diào)度。
(6)回到launchtask方法中,把他交給runningTasks這樣一個(gè)數(shù)據(jù)結(jié)構(gòu)中,放入taskid以及業(yè)務(wù)邏輯,然后交給ThreadPool。
2、總結(jié)
(1)CoarseGrainedExecutorBackend在收到 CoarseGrainedSchedulerBackend發(fā)送的LaunchTask消息后反序列化TaskDescription
(2)通過executor的launchTask方法中執(zhí)行Task,在launchTask內(nèi)部會(huì)創(chuàng)建TaskRunner,在TaskRunner內(nèi)部會(huì)做一些準(zhǔn)備工作,如反序列化task,task依賴,獲取jar等
(3)TaskRunner在ThreadPool具體運(yùn)行Task;
(4)TaskRunner中會(huì)調(diào)用反序列化的Task.run方法執(zhí)行并獲得執(zhí)行結(jié)果。在調(diào)用run方法的時(shí)候會(huì)調(diào)用Task的抽象方法runTask。在runTask內(nèi)部會(huì)調(diào)用RDD的iterator()方法,在處理的內(nèi)部會(huì)迭代Partition的元素并給我們自定義函數(shù)進(jìn)行處理。對(duì)于ShuffleMapTask,首先要對(duì)RDD以及其依賴關(guān)系進(jìn)行反序列化,最終會(huì)調(diào)用RDD的compute方法
(5)把執(zhí)行結(jié)果序列化,并根據(jù)大小判斷不同的姐夫哦傳回給Driver的方式
(6)CoarseGrainedExecutorBackend給DriverEndPoint發(fā)送StatusUpdate傳輸執(zhí)行結(jié)果,DriverEndPoint把執(zhí)行結(jié)果傳給TaskSchedulerImpl處理然后交給TaskResultGetter內(nèi)部通過線程去分別處理Task執(zhí)行成功和失敗的不同情況,然后返回DAGScheduler任務(wù)處理結(jié)束的狀況。
?
總結(jié)
- 上一篇: 静态路由实验
- 下一篇: 机器学习算法--ALS