日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark-1.6.0之Application运行信息记录器JobProgressListener

發(fā)布時間:2025/3/20 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark-1.6.0之Application运行信息记录器JobProgressListener 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

  JobProgressListener類是Spark的ListenerBus中一個很重要的監(jiān)聽器,可以用于記錄Spark任務(wù)的Job和Stage等信息,比如在Spark UI頁面上Job和Stage運行狀況以及運行進(jìn)度的顯示等數(shù)據(jù),就是從JobProgressListener中獲得的。另外,SparkStatusTracker也會從JobProgressListener中獲取Spark運行信息。外部應(yīng)用也可以通過Spark提供的status相關(guān)API比如AllJobResource, AllStagesResource, OneJobResource, OneStageResource獲取到Spark程序的運行信息。
  JobProgressListener類的繼承關(guān)系,以及該類中重要的屬性和方法,見下圖
  
  
  在Spark-1.6.0中,JobProgressListener對象生成后,會被add到一個LiveListenerBus類型的ListenerBus中。LiveListenerBus類的基礎(chǔ)關(guān)系,以及該類中重要方法和屬性見下圖
  
  文章接下來分析在一個Spark Application中JobProgressListener的生命周期,以及其數(shù)據(jù)接收和傳遞的過程。

一、JobProgressListener生成和數(shù)據(jù)寫入

1、JobProgressListener生成

  在源代碼中,JobProgressListener在SparkContext對象創(chuàng)建時就生成了,

private[spark] val listenerBus = new LiveListenerBus //listenerBus private var _jobProgressListener: JobProgressListener = _ //定義 ... _jobProgressListener = new JobProgressListener(_conf) //生成 private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener //使用 listenerBus.addListener(jobProgressListener) //使用

  從上面的代碼中看到,jobProgressListener在生成后,spark將其存入了LiveListenerBus對象中,其他任何接收到listenerBus的地方都能從中獲取到這個jobProgressListener對象。另外在創(chuàng)建SparkUI對象時,使用到了_jobProgressListener對象,使得Spark UI頁面能夠從該對象中獲取Spark應(yīng)用程序的運行時數(shù)據(jù)。或者也可以像SparkStatusTracker對象那樣,直接從SparkContext對象中獲取jobProgressListener。
  最后,在SparkContext中調(diào)用setupAndStartListenerBus()方法,啟動和初始化listenerBus。我們可以看到,在該方法中最后調(diào)用了listenerBus.start(this)方法真正啟動listenerBus。
  

2、JobProgressListener接收事件

(1)事件進(jìn)入LiveListenerBus
  LiveListenerBus繼承自AsynchronousListenerBus,可以看到這里是多線程的方式。里面維持了一個大小為10000的eventQueue,LinkedBlockingDeque類型。這個可以和DAGScheduler中提到的EventLoop類中的eventQueue對比分析。
  eventQueue接收事件調(diào)用的是post方法,這里調(diào)用的是LinkedBlockingDeque.offer方法,而EventLoop中調(diào)用的是LinkedBlockingDeque.put,可以比較這兩者的區(qū)別。

def post(event: E) {if (stopped.get) {// Drop further events to make `listenerThread` exit ASAPlogError(s"$name has already stopped! Dropping event $event")return}val eventAdded = eventQueue.offer(event) // 向eventQueue提交eventif (eventAdded) { eventLock.release() // 如果提交成功則釋放鎖} else {onDropEvent(event) // 否則丟棄該事件}}

  所以說,各類事件都是調(diào)用AsynchronousListenerBus.post方法傳入eventQueue中的。比如,在DAGScheduler類中,可以看到總共有14個調(diào)用的地方,下面列舉出其中12個不同的。

DAGScheduler方法SparkListenerEvent事件描述
executorHeartbeatReceivedSparkListenerExecutorMetricsUpdateexecutor向master發(fā)送心跳表示BlockManager仍然存活
handleBeginEventSparkListenerTaskStarttask開始執(zhí)行事件
cleanUpAfterSchedulerStopSparkListenerJobEndJob結(jié)束事件
handleGetTaskResultSparkListenerTaskGettingResulttask獲取結(jié)果事件
handleJobSubmittedSparkListenerJobStartJob開始事件
handleMapStageSubmittedSparkListenerJobStartJob開始事件
submitMissingTasksSparkListenerStageSubmittedStage提交事件
handleTaskCompletionSparkListenerTaskEndTask結(jié)束事件
handleTaskCompletionSparkListenerJobEndJob結(jié)束事件
markStageAsFinishedSparkListenerStageCompletedStage結(jié)束事件
failJobAndIndependentStagesSparkListenerJobEndJob結(jié)束事件
markMapStageJobAsFinishedSparkListenerJobEndJob結(jié)束事件

  分析到這里,各種SparkListenerEvent事件傳遞到了eventQueue中,那么如何進(jìn)一步傳遞到JobProgessListener中呢?接下來JobProgressListener作為消費者,從eventQueue中消費這些SparkListenerEvent。
  
(2)事件進(jìn)入到JobProgressListener

  從SparkContext中啟動LiveListenerBus線程開始,LiveListenerBus繼承自AsynchronousListenerBus的run方法便一直在多線程運行。在run中有一段主要邏輯,

val event = eventQueue.poll if (event == null) {// Get out of the while loop and shutdown the daemon threadif (!stopped.get) {throw new IllegalStateException("Polling `null` from eventQueue means" +" the listener bus has been stopped. So `stopped` must be true")}return } postToAll(event)

  從eventQueue取出事件后,調(diào)用LiveListenerBus的postToAll方法,將事件分發(fā)到各Listener中。
  具體看一下LiveListenerBus的postToAll方法,這個方法從ListenerBus繼承。

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {// 維持一個Array來存儲add到該bus中的所有l(wèi)istenerprivate[spark] val listeners = new CopyOnWriteArrayList[L]/*** 調(diào)用addListener方法會把傳入的listener對象存入listeners中*/final def addListener(listener: L) {listeners.add(listener)}/*** spark通過調(diào)用這個方法,spark的各種事件都會觸發(fā)listenerBus中所有l(wèi)istener對該事件作出響應(yīng)*/final def postToAll(event: E): Unit = {val iter = listeners.iteratorwhile (iter.hasNext) {val listener = iter.next()try {/*** onPostEvent方法在SparkListenerBus類中具體實現(xiàn),針對不同的事件采取不同的方法* 比如stageSubmitted, stageCompleted, jobStart, jobEnd, taskStart, * applicationStart, blockManagerAdded,executorAdded等事件* 分別調(diào)用SparkListener中不同方法進(jìn)行處理*/onPostEvent(listener, event) } catch {case NonFatal(e) =>logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)}}} }

2、JobProgressListener對各種事件的響應(yīng)

  那么接下來,從JobProgressListener對各種事件的響應(yīng)方法出發(fā),對其狀態(tài)變更邏輯作一個簡要梳理,很多方法從其命名上就能看出其主要功能,有需要的可以進(jìn)入具體方法中做進(jìn)一步的研究。JobProgressListener能做出響應(yīng)的所有SparkListenerEvent事件,基本上都列在前面的表格中了。各類事件基本上都是從DAGScheduler中傳入的,可以參考Spark Scheduler模塊源碼分析之DAGScheduler
(1)Job級別信息
  這里主要涉及到Job開始和結(jié)束的兩個方法

  • onJobStart(SparkListenerJobStart)
      在Job開始時,獲取job的一些基本信息,比如參數(shù)spark.jobGroup.id 確定的JobGroup。然后生成一個JobUIData對象,用于在Spark UI頁面上顯示Job的ID,提交時間,運行狀態(tài),這個Job包含的Stage個數(shù),完成、跳過、失敗的Stage個數(shù)。以及總的Task個數(shù),以及完成、失敗、跳過、正在運行的Task個數(shù)。該Job中包含的所有Stage都存入pendingStages中。
  • onJobEnd(SparkListenerJobEnd)
      在Job完成時,根據(jù)該Job的最終狀態(tài)是成功還是失敗,分別把該job的相關(guān)信息存入completedJob對象和failedJobs對象中,同時把成功或者失敗的job數(shù)加一。然后循環(huán)處理該Job的每一個Stage,將該Stage對應(yīng)的當(dāng)前Job移除,如果移除后發(fā)現(xiàn)該Stage再沒有其他Job使用了,就把該Stage從activeStage列表中移除。接下來,如果這個Stage的提交時間為空,則表示該Stage被跳過執(zhí)行,更新一下skipped的Stage個數(shù),以及skipped的Task個數(shù)。(成功和失敗的Stage的邏輯在下面一小節(jié)中)

(2)Stage級別信息
  有關(guān)Stage的狀態(tài)變更處理邏輯,這里也有Stage的submit和complete方法

  • onStageSubmitted(SparkListenerStageSubmitted)
      在Stage提交后,將該Stage存入activeStages中,并且從pendingStages中移除該Stage。首先獲得當(dāng)前的調(diào)度池名稱,如果是FIFO模式,則是default(實際上不起任何作用),然后根據(jù)該調(diào)度池,將這個Stage放入調(diào)度池中。然后把所屬job的numActiveStages加一,
  • onStageCompleted(SparkListenerStageCompleted)
      在Stage完成后,從調(diào)度池中將該Stage移除,同時也從activeStages中移除。根據(jù)該Stage是成功還是失敗,繼續(xù)更新completedStages或failedStages,并更新這類Stage的統(tǒng)計數(shù)。然后把對應(yīng)Job中activeStages值減一,如果這個Stage是成功的(判斷依據(jù)是failureReason為空),則把對應(yīng)job的成功Stage數(shù)加一,否則把對應(yīng)Job的失敗Stage數(shù)加一。
      
    (3)Task級別信息
      有關(guān)Task的方法有task開始,結(jié)束兩個方法

  • onTaskStart(SparkListenerTaskStart)
      當(dāng)一個Task開始運行時,會把對應(yīng)Stage中active狀態(tài)的Task計數(shù)加一,并且把這個Task相關(guān)的信息記入對應(yīng)Stage中,同時更新該Task所屬Job中Active狀態(tài)Task的個數(shù)。

  • onTaskEnd(SparkListenerTaskEnd)
      當(dāng)一個Task運行完成時,獲取該Task對應(yīng)Stage的executorSummary信息,這個executorSummary中記錄了每個Executor對應(yīng)的ExecutorSummary信息,其中包括task開始時間,失敗task個數(shù),成功task個數(shù),輸入輸出字節(jié)數(shù),shuffle read/write字節(jié)數(shù)等。然后根據(jù)這個Task所屬的executorId,找到當(dāng)前Task的運行統(tǒng)計信息execSummary。如果這個Task運行成功,就將成功task個數(shù)加一,否則就將失敗task個數(shù)加一。然后根據(jù)Task運行狀態(tài),更新對應(yīng)Stage中失敗或成功Task個數(shù)。進(jìn)一步,更新對應(yīng)Job中失敗或成功的Task個數(shù)。

二、SparkUI頁面從JobProgressListener讀取數(shù)據(jù)

  JobProgressListener主要用在向Spark UI頁面?zhèn)鬟f數(shù)據(jù)上。

轉(zhuǎn)載于:https://www.cnblogs.com/wuyida/p/6300238.html

與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖

總結(jié)

以上是生活随笔為你收集整理的Spark-1.6.0之Application运行信息记录器JobProgressListener的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。