日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Worker源码

發布時間:2024/7/5 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Worker源码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

?

1、概述

2、LaunchDriver

3、LaunchDriver

4、總結


1、概述

worker肯定是實現RPC通信的,否則別人沒法給你發消息。他繼承的是ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是線程安全的,意味著處理一條消息完成后再處理下一個消息。換句話說,在處理下一條消息時,可以看到對ThreadSafeRpcEndpoint的內部字段的更改,并且ThreadSafeRpcEndpoint中的字段不需要是volatile或等效的。但是,不能保證同一個線程將為不同的消息執行相同的ThreadSafeRpcEndpoint。即順序處理消息,不能同時并發處理。

Worker本身在實際運行的時候是作為一個進程,他會接收master的指令,有幾個非常重要的指令,如LaunchExecutor,LaunchDriver等。這兩個指令是Schedule進行資源調度(Master的schedule方法中)的時候發送的。

2、LaunchDriver

Worker在收到Driver發送的LaunchDriver類型的信息后。

(1)首先首先打印一個日志,master傳進來的時候肯定會告訴你driverId的,然后new 一個DriverRunner,然后把這個實例通過driverId交給一個HashMap數據結構val drivers = new HashMap[String, DriverRunner]。key就是driverId,value就是DriverRunner。這個數據結構非常重要,因為我們在worker上可能啟動很多Executor,需要根據ExecutorId管理具體的DriverRunner,DriverRunner內部通過線程的方式啟動了另外一個進程,所以可以簡單理解DriverRunner是Driver所在進程中本身的process,這個就是一個代理模式。

(2)管理Driver的執行,包括在Driver失敗的時候自動重啟,要是在Standalone的模式下。失敗是否重試是看DriverDescription中的supervise是否為true,如果指定了這個參數為true,driver在失敗的時候worker會負責啟動這個Driver

(3)構建好DriverRunner實例,并且已經將其加入到drivers中后,調用DriverRunner的start方法。在start方法中通過一個線程啟動Driver,并管理Driver,線程運行的時候run方法會被執行。

在run方法中有個prepareAndRunDriver用于準備Driver需要的jar并運行Driver

(4)在prepareAndRunDriver方法中,會創建工作目錄,下載jar包到本地,并封裝好Driver的啟動Command,通過buildProcessBuilder來啟動Driver。

driverDesc.command這個指定他啟動的時候應該運行什么類,就是類的入口。driverDesc是Master遠程發送過來的,為CoarseGrainedExecutorBackend

進入到runDriver方法中,有個initialize方法,里面重定向輸出和error,將stout和stderr重定向到baseDir下,這樣就可以通過log看一下曾經執行的情況。

然后執行runCommandWithRetry,在參數中會構造ProcessBuilderLike。ProcessBuilderLike在apply的時候就new ProcessBuilderLike,在這里面processBuilder.start()

在runCommandWithRetry方法中,會一直循環,孵化出一個進程,有個方法這個是阻塞的,言外之意就是當他回復過來的時候估計就有問題了,那就判斷一下

(5)在prepareAndRunDriver方法中啟動Driver之后,如果運行出狀況了,出狀況后會給自己發一個消息

(6)在這里不同的情況打印log日志,最關鍵的是sendToMaster(driverStateChanged)發送給master。發送的類型是DriverStateChanged

(7)到Master方法中。master收到這個消息后就把他remove掉,是從自己的內存數據結構中remove,同時把這個driver曾經占用的數據,還有持久化的都remove,然后再次發消息給worker確認下,因為發生了資源的變動再次進行schedule

(8)回到Worker中,start之后記錄耗了多少內存CPU

3、LaunchDriver

Worker在收到Driver發送的LaunchExecutor類型的信息后首先new 一個ExecutorRunner,然后start。過程與啟動Driver類似,就不細說。

在ExecutorRunner的start方法中會通過一個線程啟動Executor,并管理Executor,線程運行的時候run方法會被執行。

fetchAndRunExecutor方法中類似driver中創建該executor工作目錄,下載運行的jar,開啟執行application的進程executor。并向worker發送ExecutorStateChanged的事件通知,

worker先向自己發送ExecutorStateChanged的消息

在start方法之后,記錄耗了多少內存CPU,然后向master發送接收到的ExecutorStateChanged的事件通知

?

4、總結

driver進程就是executor進程;ExecutorBackend是進程名稱,standalone模式下是CoarseGrainedExecutorBackend。在CoarseGrainedExecutorBackend中有我們的Executor對象本身,Executor和ExecutorBackend是一對一的關系,就是一個ExecutorBackend進程里面有一個Executor,在Executor內部是通過線程池并發處理的方式來處理Spark提交過來的Task。

注意:Executor啟動之后要向我們的driver注冊。

總結

以上是生活随笔為你收集整理的Spark Worker源码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。