Spark-1.6.0之Application运行信息记录器JobProgressListener
JobProgressListener類是Spark的ListenerBus中一個很重要的監(jiān)聽器,可以用于記錄Spark任務(wù)的Job和Stage等信息,比如在Spark UI頁面上Job和Stage運行狀況以及運行進(jìn)度的顯示等數(shù)據(jù),就是從JobProgressListener中獲得的。另外,SparkStatusTracker也會從JobProgressListener中獲取Spark運行信息。外部應(yīng)用也可以通過Spark提供的status相關(guān)API比如AllJobResource, AllStagesResource, OneJobResource, OneStageResource獲取到Spark程序的運行信息。
JobProgressListener類的繼承關(guān)系,以及該類中重要的屬性和方法,見下圖
在Spark-1.6.0中,JobProgressListener對象生成后,會被add到一個LiveListenerBus類型的ListenerBus中。LiveListenerBus類的基礎(chǔ)關(guān)系,以及該類中重要方法和屬性見下圖
文章接下來分析在一個Spark Application中JobProgressListener的生命周期,以及其數(shù)據(jù)接收和傳遞的過程。
一、JobProgressListener生成和數(shù)據(jù)寫入
1、JobProgressListener生成
在源代碼中,JobProgressListener在SparkContext對象創(chuàng)建時就生成了,
private[spark] val listenerBus = new LiveListenerBus //listenerBus private var _jobProgressListener: JobProgressListener = _ //定義 ... _jobProgressListener = new JobProgressListener(_conf) //生成 private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener //使用 listenerBus.addListener(jobProgressListener) //使用 從上面的代碼中看到,jobProgressListener在生成后,spark將其存入了LiveListenerBus對象中,其他任何接收到listenerBus的地方都能從中獲取到這個jobProgressListener對象。另外在創(chuàng)建SparkUI對象時,使用到了_jobProgressListener對象,使得Spark UI頁面能夠從該對象中獲取Spark應(yīng)用程序的運行時數(shù)據(jù)。或者也可以像SparkStatusTracker對象那樣,直接從SparkContext對象中獲取jobProgressListener。
最后,在SparkContext中調(diào)用setupAndStartListenerBus()方法,啟動和初始化listenerBus。我們可以看到,在該方法中最后調(diào)用了listenerBus.start(this)方法真正啟動listenerBus。
2、JobProgressListener接收事件
(1)事件進(jìn)入LiveListenerBus
LiveListenerBus繼承自AsynchronousListenerBus,可以看到這里是多線程的方式。里面維持了一個大小為10000的eventQueue,LinkedBlockingDeque類型。這個可以和DAGScheduler中提到的EventLoop類中的eventQueue對比分析。
eventQueue接收事件調(diào)用的是post方法,這里調(diào)用的是LinkedBlockingDeque.offer方法,而EventLoop中調(diào)用的是LinkedBlockingDeque.put,可以比較這兩者的區(qū)別。
所以說,各類事件都是調(diào)用AsynchronousListenerBus.post方法傳入eventQueue中的。比如,在DAGScheduler類中,可以看到總共有14個調(diào)用的地方,下面列舉出其中12個不同的。
| executorHeartbeatReceived | SparkListenerExecutorMetricsUpdate | executor向master發(fā)送心跳表示BlockManager仍然存活 |
| handleBeginEvent | SparkListenerTaskStart | task開始執(zhí)行事件 |
| cleanUpAfterSchedulerStop | SparkListenerJobEnd | Job結(jié)束事件 |
| handleGetTaskResult | SparkListenerTaskGettingResult | task獲取結(jié)果事件 |
| handleJobSubmitted | SparkListenerJobStart | Job開始事件 |
| handleMapStageSubmitted | SparkListenerJobStart | Job開始事件 |
| submitMissingTasks | SparkListenerStageSubmitted | Stage提交事件 |
| handleTaskCompletion | SparkListenerTaskEnd | Task結(jié)束事件 |
| handleTaskCompletion | SparkListenerJobEnd | Job結(jié)束事件 |
| markStageAsFinished | SparkListenerStageCompleted | Stage結(jié)束事件 |
| failJobAndIndependentStages | SparkListenerJobEnd | Job結(jié)束事件 |
| markMapStageJobAsFinished | SparkListenerJobEnd | Job結(jié)束事件 |
分析到這里,各種SparkListenerEvent事件傳遞到了eventQueue中,那么如何進(jìn)一步傳遞到JobProgessListener中呢?接下來JobProgressListener作為消費者,從eventQueue中消費這些SparkListenerEvent。
(2)事件進(jìn)入到JobProgressListener
從SparkContext中啟動LiveListenerBus線程開始,LiveListenerBus繼承自AsynchronousListenerBus的run方法便一直在多線程運行。在run中有一段主要邏輯,
val event = eventQueue.poll if (event == null) {// Get out of the while loop and shutdown the daemon threadif (!stopped.get) {throw new IllegalStateException("Polling `null` from eventQueue means" +" the listener bus has been stopped. So `stopped` must be true")}return } postToAll(event) 從eventQueue取出事件后,調(diào)用LiveListenerBus的postToAll方法,將事件分發(fā)到各Listener中。
具體看一下LiveListenerBus的postToAll方法,這個方法從ListenerBus繼承。
2、JobProgressListener對各種事件的響應(yīng)
那么接下來,從JobProgressListener對各種事件的響應(yīng)方法出發(fā),對其狀態(tài)變更邏輯作一個簡要梳理,很多方法從其命名上就能看出其主要功能,有需要的可以進(jìn)入具體方法中做進(jìn)一步的研究。JobProgressListener能做出響應(yīng)的所有SparkListenerEvent事件,基本上都列在前面的表格中了。各類事件基本上都是從DAGScheduler中傳入的,可以參考Spark Scheduler模塊源碼分析之DAGScheduler
(1)Job級別信息
這里主要涉及到Job開始和結(jié)束的兩個方法
- onJobStart(SparkListenerJobStart)
在Job開始時,獲取job的一些基本信息,比如參數(shù)spark.jobGroup.id 確定的JobGroup。然后生成一個JobUIData對象,用于在Spark UI頁面上顯示Job的ID,提交時間,運行狀態(tài),這個Job包含的Stage個數(shù),完成、跳過、失敗的Stage個數(shù)。以及總的Task個數(shù),以及完成、失敗、跳過、正在運行的Task個數(shù)。該Job中包含的所有Stage都存入pendingStages中。 - onJobEnd(SparkListenerJobEnd)
在Job完成時,根據(jù)該Job的最終狀態(tài)是成功還是失敗,分別把該job的相關(guān)信息存入completedJob對象和failedJobs對象中,同時把成功或者失敗的job數(shù)加一。然后循環(huán)處理該Job的每一個Stage,將該Stage對應(yīng)的當(dāng)前Job移除,如果移除后發(fā)現(xiàn)該Stage再沒有其他Job使用了,就把該Stage從activeStage列表中移除。接下來,如果這個Stage的提交時間為空,則表示該Stage被跳過執(zhí)行,更新一下skipped的Stage個數(shù),以及skipped的Task個數(shù)。(成功和失敗的Stage的邏輯在下面一小節(jié)中)
(2)Stage級別信息
有關(guān)Stage的狀態(tài)變更處理邏輯,這里也有Stage的submit和complete方法
- onStageSubmitted(SparkListenerStageSubmitted)
在Stage提交后,將該Stage存入activeStages中,并且從pendingStages中移除該Stage。首先獲得當(dāng)前的調(diào)度池名稱,如果是FIFO模式,則是default(實際上不起任何作用),然后根據(jù)該調(diào)度池,將這個Stage放入調(diào)度池中。然后把所屬job的numActiveStages加一, onStageCompleted(SparkListenerStageCompleted)
在Stage完成后,從調(diào)度池中將該Stage移除,同時也從activeStages中移除。根據(jù)該Stage是成功還是失敗,繼續(xù)更新completedStages或failedStages,并更新這類Stage的統(tǒng)計數(shù)。然后把對應(yīng)Job中activeStages值減一,如果這個Stage是成功的(判斷依據(jù)是failureReason為空),則把對應(yīng)job的成功Stage數(shù)加一,否則把對應(yīng)Job的失敗Stage數(shù)加一。
(3)Task級別信息
有關(guān)Task的方法有task開始,結(jié)束兩個方法onTaskStart(SparkListenerTaskStart)
當(dāng)一個Task開始運行時,會把對應(yīng)Stage中active狀態(tài)的Task計數(shù)加一,并且把這個Task相關(guān)的信息記入對應(yīng)Stage中,同時更新該Task所屬Job中Active狀態(tài)Task的個數(shù)。- onTaskEnd(SparkListenerTaskEnd)
當(dāng)一個Task運行完成時,獲取該Task對應(yīng)Stage的executorSummary信息,這個executorSummary中記錄了每個Executor對應(yīng)的ExecutorSummary信息,其中包括task開始時間,失敗task個數(shù),成功task個數(shù),輸入輸出字節(jié)數(shù),shuffle read/write字節(jié)數(shù)等。然后根據(jù)這個Task所屬的executorId,找到當(dāng)前Task的運行統(tǒng)計信息execSummary。如果這個Task運行成功,就將成功task個數(shù)加一,否則就將失敗task個數(shù)加一。然后根據(jù)Task運行狀態(tài),更新對應(yīng)Stage中失敗或成功Task個數(shù)。進(jìn)一步,更新對應(yīng)Job中失敗或成功的Task個數(shù)。
二、SparkUI頁面從JobProgressListener讀取數(shù)據(jù)
JobProgressListener主要用在向Spark UI頁面?zhèn)鬟f數(shù)據(jù)上。
轉(zhuǎn)載于:https://www.cnblogs.com/wuyida/p/6300238.html
與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Spark-1.6.0之Application运行信息记录器JobProgressListener的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JMS学习(五)--ActiveMQ中的
- 下一篇: ebs 初始化登陆