日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

發布時間:2024/4/14 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概要

本篇主要闡述在TaskRunner中執行的task其業務邏輯是如何被調用到的,另外試圖講清楚運行著的task其輸入的數據從哪獲取,處理的結果返回到哪里,如何返回。

準備

  • spark已經安裝完畢
  • spark運行在local mode或local-cluster mode
  • local-cluster mode

    local-cluster模式也稱為偽分布式,可以使用如下指令運行

    MASTER=local[1,2,1024] bin/spark-shell

    [1,2,1024]?分別表示,executor number, core number和內存大小,其中內存大小不應小于默認的512M

    Driver Programme的初始化過程分析

    初始化過程的涉及的主要源文件

  • SparkContext.scala?????? 整個初始化過程的入口
  • SparkEnv.scala   ?????? 創建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager
  • DAGScheduler.scala?????? 任務提交的入口,即將Job劃分成各個stage的關鍵
  • TaskSchedulerImpl.scala 決定每個stage可以運行幾個task,每個task分別在哪個executor上運行
  • SchedulerBackend
  • 最簡單的單機運行模式的話,看LocalBackend.scala
  • 如果是集群模式,看源文件SparkDeploySchedulerBackend
  • 初始化過程步驟詳解

    步驟1: 根據初始化入參生成SparkConf,再根據SparkConf來創建SparkEnv, SparkEnv中主要包含以下關鍵性組件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager

    private[spark] val env = SparkEnv.create(conf,"",conf.get("spark.driver.host"),conf.get("spark.driver.port").toInt, isDriver = true, isLocal = isLocal) SparkEnv.set(env)

    步驟2:創建TaskScheduler,根據Spark的運行模式來選擇相應的SchedulerBackend,同時啟動taskscheduler,這一步至為關鍵

    private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)taskScheduler.start()

    TaskScheduler.start目的是啟動相應的SchedulerBackend,并啟動定時器進行檢測

    override def start() {backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { checkSpeculatableTasks() } } }

    步驟3:以上一步中創建的TaskScheduler實例為入參創建DAGScheduler并啟動運行

    @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)dagScheduler.start()

    步驟4:啟動WEB UI

    ui.start()

    RDD的轉換過程

    還是以最簡單的wordcount為例說明rdd的轉換過程

    sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

    上述一行簡短的代碼其實發生了很復雜的RDD轉換,下面仔細解釋每一步的轉換過程和轉換結果

    步驟1:val rawFile = sc.textFile("README.md")

    textFile先是生成hadoopRDD,然后再通過map操作生成MappedRDD,如果在spark-shell中執行上述語句,得到的結果可以證明所做的分析

    scala> sc.textFile("README.md") 14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750 14/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB) 14/04/23 13:11:48 DEBUG BlockManager: Put block broadcast_0 locally took 277 ms 14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 ms res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13

    步驟2: val splittedText = rawFile.flatMap(line => line.split(" "))

    flatMap將原來的MappedRDD轉換成為FlatMappedRDD

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f))

    步驟3:val wordCount = splittedText.map(word => (word, 1))

    利用word生成相應的鍵值對,上一步的FlatMappedRDD被轉換成為MappedRDD

    步驟4:val reduceJob = wordCount.reduceByKey(_ + _),這一步最復雜

    步驟2,3中使用到的operation全部定義在RDD.scala中,而這里使用到的reduceByKey卻在RDD.scala中見不到蹤跡。reduceByKey的定義出現在源文件PairRDDFunctions.scala

    細心的你一定會問reduceByKey不是MappedRDD的屬性和方法啊,怎么能被MappedRDD調用呢?其實這背后發生了一個隱式的轉換,該轉換將MappedRDD轉換成為PairRDDFunctions

    implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =new PairRDDFunctions(rdd)

    這種隱式的轉換是scala的一個語法特征,如果想知道的更多,請用關鍵字"scala implicit method"進行查詢,會有不少的文章對此進行詳盡的介紹。

    接下來再看一看reduceByKey的定義

    def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {reduceByKey(defaultPartitioner(self), func)}def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {combineByKey[V]((v: V) => v, func, func, partitioner)}def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializerClass: String = null): RDD[(K, C)] = { if (getKeyClass().isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { val combined = self.mapPartitionsWithContext((context, iter) => { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } }

    reduceByKey最終會調用combineByKey, 在這個函數中PairedRDDFunctions會被轉換成為ShuffleRDD,當調用mapPartitionsWithContext之后,shuffleRDD被轉換成為MapPartitionsRDD

    Log輸出能證明我們的分析

    res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13

    RDD轉換小結

    小結一下整個RDD轉換過程

    HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD

    整個轉換過程好長啊,這一切的轉換都發生在任務提交之前。

    運行過程分析

    數據集操作分類

    在對任務運行過程中的函數調用關系進行分析之前,我們也來探討一個偏理論的東西,作用于RDD之上的Transformantion為什么會是這個樣子?

    對這個問題的解答和數學搭上關系了,從理論抽象的角度來說,任務處理都可歸結為“input->processing->output"。input和output對應于數據集dataset.

    在此基礎上作一下簡單的分類

  • one-one 一個dataset在轉換之后還是一個dataset,而且dataset的size不變,如map
  • one-one 一個dataset在轉換之后還是一個dataset,但size發生更改,這種更改有兩種可能:擴大或縮小,如flatMap是size增大的操作,而subtract是size變小的操作
  • many-one 多個dataset合并為一個dataset,如combine, join
  • one-many 一個dataset分裂為多個dataset, 如groupBy
  • Task運行期的函數調用

    task的提交過程參考本系列中的第二篇文章。本節主要講解當task在運行期間是如何一步步調用到作用于RDD上的各個operation

    • TaskRunner.run
      • Task.run
        • Task.runTask (Task是一個基類,有兩個子類,分別為ShuffleMapTask和ResultTask)
          • RDD.iterator
            • RDD.computeOrReadCheckpoint
              • RDD.compute 

    或許當看到RDD.compute函數定義時,還是覺著f沒有被調用,以MappedRDD的compute定義為例

    override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f)

    注意,這里最容易產生錯覺的地方就是map函數,這里的map不是RDD中的map,而是scala中定義的iterator的成員函數map, 請自行參考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

    堆棧輸出

    80 at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111)81 at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 82 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 83 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 84 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 85 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 86 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 87 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 88 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 89 at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 90 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 91 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 92 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 93 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 94 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 95 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 96 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 97 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 98 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 99 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) 100 at org.apache.spark.scheduler.Task.run(Task.scala:53) 101 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)

    ResultTask

    compute的計算過程對于ShuffleMapTask比較復雜,繞的圈圈比較多,對于ResultTask就直接許多。

    override def runTask(context: TaskContext): U = {metrics = Some(context.taskMetrics)try {func(context, rdd.iterator(split, context))} finally {context.executeOnCompleteCallbacks()}}

    ?計算結果的傳遞

    上面的分析知道,wordcount這個job在最終提交之后,被DAGScheduler分為兩個stage,第一個Stage是shuffleMapTask,第二個Stage是ResultTask.

    那么ShuffleMapTask的計算結果是如何被ResultTask取得的呢?這個過程簡述如下

  • ShffuleMapTask將計算的狀態(注意不是具體的數據)包裝為MapStatus返回給DAGScheduler
  • DAGScheduler將MapStatus保存到MapOutputTrackerMaster中
  • ResultTask在執行到ShuffleRDD時會調用BlockStoreShuffleFetcher的fetch方法去獲取數據
  • 第一件事就是咨詢MapOutputTrackerMaster所要取的數據的location
  • 根據返回的結果調用BlockManager.getMultiple獲取真正的數據
  • BlockStoreShuffleFetcher的fetch函數偽碼

    val blockManager = SparkEnv.get.blockManagerval startTime = System.currentTimeMillisval statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(shuffleId, reduceId, System.currentTimeMillis - startTime))val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)

    注意上述代碼中的getServerStatusesgetMultiple,一個是詢問數據的位置,一個是去獲取真正的數據。

    有關Shuffle的詳細解釋,請參考”詳細探究Spark的shuffle實現一文"http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/

    轉載于:https://www.cnblogs.com/captain_ccc/articles/4129311.html

    總結

    以上是生活随笔為你收集整理的Apache Spark源码走读之3 -- Task运行期之函数调用关系分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。