Spark Worker源码
目錄
?
1、概述
2、LaunchDriver
3、LaunchDriver
4、總結
1、概述
worker肯定是實現RPC通信的,否則別人沒法給你發消息。他繼承的是ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是線程安全的,意味著處理一條消息完成后再處理下一個消息。換句話說,在處理下一條消息時,可以看到對ThreadSafeRpcEndpoint的內部字段的更改,并且ThreadSafeRpcEndpoint中的字段不需要是volatile或等效的。但是,不能保證同一個線程將為不同的消息執行相同的ThreadSafeRpcEndpoint。即順序處理消息,不能同時并發處理。
Worker本身在實際運行的時候是作為一個進程,他會接收master的指令,有幾個非常重要的指令,如LaunchExecutor,LaunchDriver等。這兩個指令是Schedule進行資源調度(Master的schedule方法中)的時候發送的。
2、LaunchDriver
Worker在收到Driver發送的LaunchDriver類型的信息后。
(1)首先首先打印一個日志,master傳進來的時候肯定會告訴你driverId的,然后new 一個DriverRunner,然后把這個實例通過driverId交給一個HashMap數據結構val drivers = new HashMap[String, DriverRunner]。key就是driverId,value就是DriverRunner。這個數據結構非常重要,因為我們在worker上可能啟動很多Executor,需要根據ExecutorId管理具體的DriverRunner,DriverRunner內部通過線程的方式啟動了另外一個進程,所以可以簡單理解DriverRunner是Driver所在進程中本身的process,這個就是一個代理模式。
(2)管理Driver的執行,包括在Driver失敗的時候自動重啟,要是在Standalone的模式下。失敗是否重試是看DriverDescription中的supervise是否為true,如果指定了這個參數為true,driver在失敗的時候worker會負責啟動這個Driver
(3)構建好DriverRunner實例,并且已經將其加入到drivers中后,調用DriverRunner的start方法。在start方法中通過一個線程啟動Driver,并管理Driver,線程運行的時候run方法會被執行。
在run方法中有個prepareAndRunDriver用于準備Driver需要的jar并運行Driver
(4)在prepareAndRunDriver方法中,會創建工作目錄,下載jar包到本地,并封裝好Driver的啟動Command,通過buildProcessBuilder來啟動Driver。
driverDesc.command這個指定他啟動的時候應該運行什么類,就是類的入口。driverDesc是Master遠程發送過來的,為CoarseGrainedExecutorBackend
進入到runDriver方法中,有個initialize方法,里面重定向輸出和error,將stout和stderr重定向到baseDir下,這樣就可以通過log看一下曾經執行的情況。
然后執行runCommandWithRetry,在參數中會構造ProcessBuilderLike。ProcessBuilderLike在apply的時候就new ProcessBuilderLike,在這里面processBuilder.start()
在runCommandWithRetry方法中,會一直循環,孵化出一個進程,有個方法這個是阻塞的,言外之意就是當他回復過來的時候估計就有問題了,那就判斷一下
(5)在prepareAndRunDriver方法中啟動Driver之后,如果運行出狀況了,出狀況后會給自己發一個消息
(6)在這里不同的情況打印log日志,最關鍵的是sendToMaster(driverStateChanged)發送給master。發送的類型是DriverStateChanged
(7)到Master方法中。master收到這個消息后就把他remove掉,是從自己的內存數據結構中remove,同時把這個driver曾經占用的數據,還有持久化的都remove,然后再次發消息給worker確認下,因為發生了資源的變動再次進行schedule
(8)回到Worker中,start之后記錄耗了多少內存CPU
3、LaunchDriver
Worker在收到Driver發送的LaunchExecutor類型的信息后首先new 一個ExecutorRunner,然后start。過程與啟動Driver類似,就不細說。
在ExecutorRunner的start方法中會通過一個線程啟動Executor,并管理Executor,線程運行的時候run方法會被執行。
fetchAndRunExecutor方法中類似driver中創建該executor工作目錄,下載運行的jar,開啟執行application的進程executor。并向worker發送ExecutorStateChanged的事件通知,
worker先向自己發送ExecutorStateChanged的消息
在start方法之后,記錄耗了多少內存CPU,然后向master發送接收到的ExecutorStateChanged的事件通知
?
4、總結
driver進程就是executor進程;ExecutorBackend是進程名稱,standalone模式下是CoarseGrainedExecutorBackend。在CoarseGrainedExecutorBackend中有我們的Executor對象本身,Executor和ExecutorBackend是一對一的關系,就是一個ExecutorBackend進程里面有一個Executor,在Executor內部是通過線程池并發處理的方式來處理Spark提交過來的Task。
注意:Executor啟動之后要向我們的driver注冊。
總結
以上是生活随笔為你收集整理的Spark Worker源码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java的常用引用类、数组、String
- 下一篇: Stage划分和Task最佳位置