Spark Master的注册机制与状态管理
目錄
?
1、Master接收注冊的主要對象
2、Master接收Worker的注冊
3、Master接收Driver的注冊
4、Master處理Driver狀態變化
5、Master接收Application的注冊
6、Master處理Executor狀態變化
1、Master接收注冊的主要對象
Master主要接受注冊的對象是:Application,Driver,Worker。
注意:Executor不是注冊給master而是注冊給Driver中的SchedulerBackend
2、Master接收Worker的注冊
Worker啟動后主動向Master注冊,所以在生產環境下不需要重啟集群就能夠使用新的Worker。
(1)worker是一個消息循環體,因為繼承了ThreadSafeRPCEndpoint。他啟動的時候有一大堆我們找到onStart方法。
(2)這里面調用了registerWithMaster,這里面用了tryRegisterAllMaster方法向所有的master提交。
(3)在具體注冊向所有的master提交的時候,是用線程池的中一個線程來提交。然后就獲得了masterEndpoint,獲得了masterEndpoint之后,將其作為參數傳入registerWithMaster方法。masterRpcAddresses是所有的master的地址,這里的map是進行對master進行逐個注冊。這里發送給所有的master而不時發送給active是因為:如果不這樣做的話,worker啟動的時候就要搞明白誰是active的master,這樣他的負擔就加重了,就不符合強內聚,弱耦合的架構設計。
(4)在registerWithMaster的時候傳進去的消息體就是RegisterWorker,它是個case class
這個worker是自己的引用,master可以通過這個ref來通信
(5)worker發出的消息經過通信之后,master會接收到注冊請求,worker發送的是RegisterWorker,master的消息接收器,接收到了這個消息,如下
這個代碼邏輯是:收到了worker啟動注冊的信息之后,首先判斷一下自己的狀態,如果是standby那肯定沒戲。然后判斷idToWorker這個HashMap中有沒有這個worker,他的key就代表了worker本身字符串級別的描述,WorkerInfo和注冊的信息基本是一致的。所以WorkerInfo就包含了對worker所又了解的內容。也就是說他構建了一個內存數據結構,包含了所有已經注冊的信息,如果已經注冊過的話就不會注冊。
如果master不是standby,worker也沒注冊過的話我們會構建workerInfo,這里是接收通過模式匹配匹配到的具體的worker發過來的信息并報存。在這基礎之上,把workerInfo作為參數傳入到registerWorker,執行具體的注冊的過程。
注冊其實就是登記一下,保留信息。這里會過濾掉workState為dead的work,也就是說如果worker已經dead掉了,將來某段時間注冊的話,是不會接受的(因為曾經已經dead掉了,現在突然活了,認為是不可以思議的事情)。
也就是首先會判斷worker的狀態是否是DEAD的狀態則直接過濾掉,對于UNKOWN狀態的內容會調用removeWorker清理(包括清理該worker下的executors和drivers),因為在具體的機器上,這個節點向executor和driver被當前的worker,也就是要remove的worker生成和管理的,worker掛了沒人管理了。如果都沒問題就加入到workers,idToWorker,addressWorker這3個內存數據結構中
(6)回到receiveAndReply中注冊完畢之后,persistenceEngine持久化引擎要把注冊的worker持久化起來(HA)。然后就reply回復。再然后Schedule()調度
3、Master接收Driver的注冊
Driver注冊給Master,Master會將Driver的信息放入內存緩存中,然后默認加入等待調度的隊列,然后再次用持久化引擎將Driver持久化,然后使用schedule進行調度
(1)在Client啟動的時候會創建一個ClientEndpoint。
一個RpcEndpoint經歷的過程依次是:構建->onStart→receive→onStop。其中onStart在接收任務消息前調用,receive和receiveAndReply分別用來接收另一個RpcEndpoint(也可以是本身)send和ask過來的消息。在他的onStart方法中調用ayncSendToMasterAndForwardReply方法向每個master發送RequestSubmitDriver這個case class他的參數是driverDescription也是個case class包含了driver的信息
(2)master在接收到client發送過來的RequestSubmitDriver消息后,創建一個DriverInfo,然后persistenceEngine持久化引擎要把driver持久化起來,并且加入到waitingDrivers這個等待隊列中
4、Master處理Driver狀態變化
Driver的狀態有SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED, ERROR。當進行改變時會發送DriverStateChanged的消息給Master。如果狀態是ERROR、FINISHED、KILLED、FAILED這幾種情況,則調用removeDriver。
removeDriver里面首先根據driverId來find看下有沒有這個driver,你說你的狀態發生變化我要看你曾經有沒有登記,如果沒有登記的話就打印日志如果有的話就從drivers中把當前這個driver去掉,同時看下completedDrivers的個數是不是>=RETAED_DRIVERS的個數
5、Master接收Application的注冊
Application本身就是通過spark-submit的方式提交application的時候是通過schedulerBackend注冊,然后將Application的信息放入內存緩存中,然后application加入等待調度的application隊列,然后再次用持久化引擎將drievr持久化,然后使用schedule進行調度。
注意:注冊的時候是先注冊Driver然后再注冊Application
在schedulerBackend的實現類StandaloneSchedulerBackend中的start方法中會創建一個StandaloneAppClient。
在StandaloneAppClient發送RegisterApplication的消息向master注冊
Master收到RegisterApplication消息后Application的信息放入內存緩存中,然后application加入等待調度的application隊列,然后再次用持久化引擎將drievr持久化,然后使用schedule進行調度。
?
6、Master處理Executor狀態變化
當Executor狀態發送變化時也同時會更新在Master端的注冊信息,收到的消息是ExecutorStateChanged
首先查一下已注冊的executors中有沒有這個executor。任何獲得executorState信息 如果是RUNNING的狀態resetRetryCount重置一下次數。就是Executor掛掉的時候會嘗試一定次數的重啟(最多重試10次)。這里只是記錄次數。
這個時候Executor的狀態發生變化,他會告訴driver,send給他,就是driver要知道這個內容。如果是狀態是Finish的話就把他remove掉,executor的lost大多是有Shuffle的輸出導致的
總結
以上是生活随笔為你收集整理的Spark Master的注册机制与状态管理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Anaconda3使用过程中遇到的问题
- 下一篇: libusb android pc,li