spark内核揭秘-09-RDD的count操作 触发Job全生命周期-01
RDD源碼的count方法:
從上面代碼可以看出來,count方法觸發SparkContext的runJob方法的調用:
進入?runJob(rdd, func, 0 until rdd.partitions.size, false)方法:
進一步跟蹤runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)方法:
繼續跟蹤進入runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)方法:
代碼分析:
1、getCallSite :
2、clean(func):
3、dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,?resultHandler, localProperties.get):
代碼分析:
3.1、進入submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties):
上面代碼分析:
3.1.1、 進入new JobWaiter(this, jobId, partitions.size, resultHandler)方法
3.1.2、進入eventProcessActor ! JobSubmitted(?jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)方法
我們可以看出來,是給自己發消息的
3.1.3、進入? dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties)方法
首先構建finalStage,然后又一個getMissingParentsStages方法,可以發現運行有本地運行和集群運行兩種模式,本地運行主要用于本地實驗和調試:
3.1.3.1、進入??finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)方法:
3.1.3.2、進入?runLocally(job)方法:
3.1.3.2.1、?runLocallyWithinThread(job)方法:
3.1.3.3、進入 submitStage(finalStage)方法:
上面代碼分析:submitStage第一次傳入的參數是Job的最后一個Stage,然后判斷一下是否缺失父Stage,如果沒有依賴的parent Stage的話就可以submitMissingTasks運行,如果有parent Stage的話就要再一次submitStage做遞歸操作,最終會導致submitMissingTasks的調用:
3.1.3.3.1、進入??activeJobForStage(stage)?方法:
3.1.3.3.2、進入??getMissingParentStages(stage).sortBy(_.id)?方法:
跟進getShuffleMapState方法:
進入registerShuffleDependencies方法:
3.1.3.3.3、進入submitMissingTasks(stage, jobId.get)?方法:
PS:分析代碼太多,下篇繼續分析源碼
版權聲明:本文為博主原創文章,未經博主允許不得轉載。
轉載于:https://www.cnblogs.com/stark-summer/p/4829813.html
總結
以上是生活随笔為你收集整理的spark内核揭秘-09-RDD的count操作 触发Job全生命周期-01的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: holer实现外网访问内网数据库
- 下一篇: weblogic jprofile配置