Spark源码和调优简介 Spark Core
作者:calvinrzluo,騰訊 IEG 后臺開發工程師
本文基于 Spark 2.4.4 版本的源碼,試圖分析其 Core 模塊的部分實現原理,其中如有錯誤,請指正。為了簡化論述,將部分細節放到了源碼中作為注釋,因此正文中是主要內容。
Spark Core
RDD
RDD(Resilient Distributed Dataset),即彈性數據集是 Spark 中的基礎結構。RDD 是 distributive 的、immutable 的,可以被 persist 到磁盤或者內存中。
對 RDD 具有轉換操作和行動操作兩種截然不同的操作。轉換(Transform)操作從一個 RDD 生成另一個 RDD,但行動(Action)操作會去掉 RDD 的 Context。例如take是行動操作,返回的是一個數組而不是 RDD 了,如下所示
scala>?var?rdd1?=?sc.makeRDD(Seq(10,?4,?2,?12,?3)) rdd1:?org.apache.spark.rdd.RDD[Int]?=?ParallelCollectionRDD[40]?at?makeRDD?at?:21scala>?rdd1.take(1) res0:?Array[Int]?=?Array(10)scala>?rdd1.take(2) res1:?Array[Int]?=?Array(10,?4)轉換操作是 Lazy 的,直到遇到一個 Eager 的 Action 操作,Spark 才會生成關于整條鏈的執行計劃并執行。這些 Action 操作將一個 Spark Application 分為了多個 Job。
常見的Action 操作包括:reduce、collect、count、take(n)、first、takeSample(withReplacement, num, [seed])、takeOrdered(n, [ordering])、saveAsTextFile(path)、saveAsSequenceFile(path)、saveAsObjectFile(path)、countByKey()、foreach(func)。
常見 RDD
RDD 是一個抽象類abstract class RDD[T] extends Serializable with Logging,在 Spark 中有諸如ShuffledRDD、HadoopRDD等實現。每個 RDD 都有對應的compute方法,用來描述這個 RDD 的計算方法。需要注意的是,這些 RDD 可能被作為某些 RDD 計算的中間結果,例如CoGroupedRDD,對應的,例如MapPartitionsRDD也可能是經過多個 RDD 變換得到的,其決定權在于所使用的算子。
我們來具體查看一些 RDD。
ParallelCollectionRDD
這個 RDD 由parallelize得到
HadoopRDD
class?HadoopRDD[K,?V]?extends?RDD[(K,?V)]?with?LoggingFileScanRDD
這個 RDD 一般從spark.read.text(...)語句中產生,所以實現在sql 模塊中。
MapPartitionsRDD
class?MapPartitionsRDD[U,?T]?extends?RDD[U]這個 RDD 是map、mapPartitions、mapPartitionsWithIndex操作的結果。
注意,在較早期的版本中,map會得到一個MappedRDD,filter會得到一個FilteredRDD、flatMap會得到一個FlatMappedRDD,不過目前已經找不到了,統一變成MapPartitionsRDD
scala>?val?a3?=?arr.map(i?=>?(i+1,?i)) a3:?org.apache.spark.rdd.RDD[(Int,?Int)]?=?MapPartitionsRDD[2]?at?map?at?<console>:25 scala>?val?a3?=?arr.filter(i?=>?i?>?3) a3:?org.apache.spark.rdd.RDD[Int]?=?MapPartitionsRDD[4]?at?filter?at?<console>:25 scala>?val?a3?=?arr.flatMap(i?=>?Array(i)) a3:?org.apache.spark.rdd.RDD[Int]?=?MapPartitionsRDD[5]?at?flatMap?at?<console>:25join操作的結果也是MapPartitionsRDD,這是因為其執行過程的最后一步flatMapValues會創建一個MapPartitionsRDD
scala>?val?rdd1?=?sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3))) rdd1:?org.apache.spark.rdd.RDD[(Int,?Int)]?=?ParallelCollectionRDD[8]?at?parallelize?at?<console>:24scala>?val?rdd2?=?sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3))) rdd2:?org.apache.spark.rdd.RDD[(Int,?Int)]?=?ParallelCollectionRDD[9]?at?parallelize?at?<console>:24scala>?val?rddj?=?rdd1.join(rdd2) rddj:?org.apache.spark.rdd.RDD[(Int,?(Int,?Int))]?=?MapPartitionsRDD[12]?at?join?at?<console>:27ShuffledRDD
ShuffledRDD用來存儲所有 Shuffle 操作的結果,其中K、V很好理解,C是 Combiner Class。
以groupByKey為例
scala>?val?a2?=?arr.map(i?=>?(i+1,?i)) a2:?org.apache.spark.rdd.RDD[(Int,?Int)]?=?MapPartitionsRDD[2]?at?map?at?<console>:25scala>?a2.groupByKey res1:?org.apache.spark.rdd.RDD[(Int,?Iterable[Int])]?=?ShuffledRDD[3]?at?groupByKey?at?<console>:26注意,groupByKey需要 K 是 Hashable 的,否則會報錯。
scala>?val?a2?=?arr.map(i?=>?(Array.fill(10)(i),?i)) a2:?org.apache.spark.rdd.RDD[(Array[Int],?Int)]?=?MapPartitionsRDD[2]?at?map?at?<console>:25scala>?a2.groupByKey org.apache.spark.SparkException:?HashPartitioner?cannot?partition?array?keys.at?org.apache.spark.rdd.PairRDDFunctions不能識別此Latex公式: anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:84)at?org.apache.spark.rdd.PairRDDFunctionsanonfun<span?class="katex-html"?aria-hidden="true" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;"><span?class="strut"?style="height:0.8888799999999999em;vertical-align:-0.19444em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;"><span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">c<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">o<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">m<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">b<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">i<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">n<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">e<span?class="mord?mathit"?style="margin-right:0.05017em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">B<span?class="mord?mathit"?style="margin-right:0.03588em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">y<span?class="mord?mathit"?style="margin-right:0.07153em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">K<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">e<span?class="mord?mathit"?style="margin-right:0.03588em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">y<span?class="mord?mathit"?style="margin-right:0.13889em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">W<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">i<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">t<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">h<span?class="mord?mathit"?style="margin-right:0.07153em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">C<span?class="mord?mathit"?style="margin-right:0.01968em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">l<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">a<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">s<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">s<span?class="mord?mathit"?style="margin-right:0.13889em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">T<span?class="mord?mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">a<span?class="mord?mathit"?style="margin-right:0.03588em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">g1.apply(PairRDDFunctions.scala:77) </span?class="mord?mathit"?style="margin-right:0.03588em;"></span?class="mord?mathit"></span?class="mord?mathit"?style="margin-right:0.13889em;"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"?style="margin-right:0.01968em;"></span?class="mord?mathit"?style="margin-right:0.07153em;"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"?style="margin-right:0.13889em;"></span?class="mord?mathit"?style="margin-right:0.03588em;"></span?class="mord?mathit"></span?class="mord?mathit"?style="margin-right:0.07153em;"></span?class="mord?mathit"?style="margin-right:0.03588em;"></span?class="mord?mathit"?style="margin-right:0.05017em;"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="mord?mathit"></span?class="strut"?style="height:0.8888799999999999em;vertical-align:-0.19444em;"></span?class="katex-html"?aria-hidden="true">CoGroupedRDD
class?CoGroupedRDD[K]?extends?RDD[(K,?Array[Iterable[_]])]首先,我們需要了解一下什么是cogroup操作,這個方法有多個重載版本。如下所示的版本,對this或other1或other2的所有的 key,生成一個RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2])),表示對于這個 key,這三個 RDD 中所有值的集合。容易看到,這個算子能夠被用來實現 Join 和 Union(不過后者有點大材小用了)
def?cogroup[W1,?W2](other1:?RDD[(K,?W1?"W1,?W2")],?other2:?RDD[(K,?W2)],?partitioner:?Partitioner):?RDD[(K,?(Iterable[V],?Iterable[W1],?Iterable[W2]))]這里的Partitioner是一個abstract class,具有numPartitions: Int和getPartition(key: Any): Int兩個方法。通過繼承Partitioner可自定義分區的實現方式,目前官方提供有RangePartitioner和HashPartitioner等。
UnionRDD
class?UnionRDD[T]?extends?RDD[T]UnionRDD一般通過union算子得到
scala>?val?a5?=?arr.union(arr2) a5:?org.apache.spark.rdd.RDD[Int]?=?UnionRDD[7]?at?union?at?<console>:27CoalescedRDD
常見 RDD 外部函數
Spark 在 RDD 之外提供了一些外部函數,它們可以通過隱式轉換的方式變成 RDD。
PairRDDFunctions
這個 RDD 被用來處理 KV 對,相比RDD,它提供了groupByKey、join等方法。以combineByKey為例,他有三個模板參數,從 RDD 過來的K和V以及自己的C。相比 reduce 和 fold 系列的(V, V)=> V,這多出來的C使combineByKey更靈活,通過combineByKey能夠將V變換為C。
OrderedRDDFunctions
這個用來提供sortByKey、filterByRange等方法。
Spark 的架構概覽
Spark 在設計上的一個特點是它和下層的集群管理是分開的,一個 Spark Application 可以看做是由集群上的若干進程組成的。因此,我們需要區分 Spark 中的概念和下層集群中的概念,例如我們常見的 Master 和 Worker 是集群中的概念,表示節點;而 Driver 和 Executor 是 Spark 中的概念,表示進程。根據爆棧網,Driver 可能位于某個 Worker 節點中,或者位于 Master 節點上,這取決于部署的方式
在官網上給了這樣一幅圖,詳細闡明了 Spark 集群下的基礎架構。SparkContext是整個 Application 的管理核心,由 Driver 來負責管理。SparkContext負責管理所有的 Executor,并且和下層的集群管理進行交互,以請求資源。
在 Stage 層次及以上接受DAGScheduler的調度,而TaskScheduler則調度一個 Taskset。在 Spark on Yarn 模式下,CoarseGrainedExecutorBackend 和 Executor 一一對應,它是一個獨立于 Worker 主進程之外的一個進程,我們可以 jps 查看到。而 Task 是作為一個 Executor 啟動的一個線程來跑的,一個 Executor 中可以跑多個 Task。
在實現上,CoarseGrainedExecutorBackend繼承了ExecutorBackend這個 trait,作為一個IsolatedRpcEndpoint,維護Executor對象實例,并通過創建的DriverEndpoint實例的與 Driver 進行交互。
在進程啟動時,CoarseGrainedExecutorBackend調用onStart()方法向 Driver 注冊自己,并產生一條"Connecting to driver的 INFO。CoarseGrainedExecutorBackend通過DriverEndpoint.receive方法來處理來自 Driver 的命令,包括LaunchTask、KillTask等。這里注意一下,在 scheduler 中有一個CoarseGrainedSchedulerBackend,里面實現相似,在看代碼時要注意區分開。
有關 Executor 和 Driver 的關系,下面這張圖更加直觀,需要說明的是,一個 Worker 上面也可能跑有多個 Executor,每個 Task 也可以在多個 CPU 核心上面運行
Spark 上下文
在代碼里我們操作一個 Spark 任務有兩種方式,通過 SparkContext,或者通過 SparkSession
SparkContext 方式
SparkContext 是 Spark 自創建來一直存在的類。我們通過 SparkConf 直接創建 SparkContext
SparkSession 方式
SparkSession 是在 Spark2.0 之后提供的 API,相比 SparkContext,他提供了對 SparkSQL 的支持(持有SQLContext),例如createDataFrame等方法就可以通過 SparkSession 來訪問。
在builder.getOrCreate()的過程中,雖然最終得到的是一個 SparkSession,但實際上內部已經創建了一個 SparkContext,并由這個 SparkSession 持有。
???val?spark:?SparkSession?=?SparkSession.builder()?//?得到一個Builder.master("local").appName("AppName").config("spark.some.config.option",?"some-value") .getOrCreate()?//?得到一個SparkSession//?SparkSession.scalaval?sparkContext?=?userSuppliedContext.getOrElse?{val?sparkConf?=?new?SparkConf()options.foreach?{?case?(k,?v)?=>?sparkConf.set(k,?v)?}//?set?a?random?app?name?if?not?given.if?(!sparkConf.contains("spark.app.name"))?{sparkConf.setAppName(java.util.UUID.randomUUID().toString)}SparkContext.getOrCreate(sparkConf)//?Do?not?update?`SparkConf`?for?existing?`SparkContext`,?as?it's?shared?by?all?sessions. }applyExtensions(sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),extensions)session?=?new?SparkSession(sparkContext,?None,?None,?extensions)SparkEnv
SparkEnv持有一個 Spark 實例在運行時所需要的所有對象,包括 Serializer、RpcEndpoint(在早期用的是 Akka actor)、BlockManager、MemoryManager、BroadcastManager、SecurityManager、MapOutputTrackerMaster/Worker 等等。
SparkEnv 由 SparkContext 創建,并在之后通過伴生對象SparkEnv的get方法來訪問。
在創建時,Driver 端的 SparkEnv 是 SparkContext 創建的時候調用SparkEnv.createDriverEnv創建的。Executor 端的是其守護進程CoarseGrainedExecutorBackend創建的時候調用SparkEnv.createExecutorEnv方法創建的。這兩個方法最后都會調用create方法
//?Driver端 private[spark]?def?createSparkEnv(conf:?SparkConf,isLocal:?Boolean,listenerBus:?LiveListenerBus):?SparkEnv?=?{SparkEnv.createDriverEnv(conf,?isLocal,?listenerBus,?SparkContext.numDriverCores(master,?conf)) } _env?=?createSparkEnv(_conf,?isLocal,?listenerBus) SparkEnv.set(_env)//?Executor端 //?CoarseGrainedExecutorBackend.scala val?env?=?SparkEnv.createExecutorEnv(driverConf,?arguments.executorId,?arguments.bindAddress,arguments.hostname,?arguments.cores,?cfg.ioEncryptionKey,?isLocal?=?false)env.rpcEnv.setupEndpoint("Executor",?backendCreateFn(env.rpcEnv,?arguments,?env)) arguments.workerUrl.foreach?{?url?=>env.rpcEnv.setupEndpoint("WorkerWatcher",?new?WorkerWatcher(env.rpcEnv,?url)) } env.rpcEnv.awaitTermination()//?SparkEnv.scala //?create函數 val?blockManager?=?new?BlockManager(...)Spark 的任務調度
Spark 的操作可以分為兩種,Transform 操作是 Lazy 的,而 Action 操作是 Eager 的。每一個 Action 會產生一個 Job。
Spark 的 Transform 操作可以分為寬依賴(ShuffleDependency)和窄依賴(NarrowDependency)操作兩種,其中窄依賴還有兩個子類OneToOneDependency和RangeDependency。窄依賴操作表示父 RDD 的每個分區只被子 RDD 的一個分區所使用,例如union、map、filter等的操作;而寬依賴恰恰相反。寬依賴需要 shuffle 操作,因為需要將父 RDD 的結果需要復制給不同節點用來生成子 RDD,有關ShuffleDependency將在下面的 Shuffle 源碼分析中詳細說明。當 DAG 的執行中出現寬依賴操作時,Spark 會將其前后劃分為不同的 Stage,在下一章節中將具體分析相關代碼。
在 Stage 之下,就是若干個 Task 了。這些 Task 也就是 Spark 的并行單元,通常來說,按照當前 Stage 的最后一個 RDD 的分區數來計算,每一個分區都會啟動一個 Task 來進行計算。我們可以通過rdd.partitions.size來獲取一個 RDD 有多少個分區。
Task 具有兩種類型,ShuffleMapTask和ResultTask。其中ResultTask是ResultStage的 Task,也就是最后一個 Stage 的 Task。
Spark 的存儲管理
為了實現與底層細節的解耦,Spark 的存儲基于 BlockManager 給計算部分提供服務。類似于 Driver 和 Executor,BlockManager 機制也分為 BlockManagerMaster 和 BlockManager。Driver 上的 BlockManagerMaster 對于存在與 Executor 上的 BlockManager 統一管理。BlockManager 只是負責管理所在 Executor 上的 Block。
BlockManagerMaster 和 BlockManager 都是在 SparkEnv 中創建的,
Driver 節點和 Executor 節點的 BlockManager 之間的交互可以使用下圖來描述,在此就不詳細說明。
BlockId 和 BlockInfo
抽象類BlockId被用來唯一標識一個 Block,具有全局唯一的名字,通常和一個文件相對應。BlockId有著確定的命名規則,并且和它實際的類型有關。
如果它是用來 Shuffle 的ShuffleBlockId,那么他的命名就是
抑或它是用來 Broadcast 的BroadcastBlockId,他的命名就是
"broadcast_"?+?broadcastId?+?(if?(field?==?"")?""?else?"_"?+?field)或者它是一個 RDD,它的命名就是
"rdd_"?+?rddId?+?"_"?+?splitIndex通過在 Spark.log 里面跟蹤這些 block 名字,我們可以了解到當前 Spark 任務的執行和存儲情況。
BlockInfo中的level項表示這個 block 的存儲級別。
//?BlockInfoManager.scala private[storage]?class?BlockInfo(val?level:?StorageLevel,val?classTag:?ClassTag[_],val?tellMaster:?Boolean)?{持久化
Spark 提供了如下的持久化級別,其中選項為useDisk、useMemory、useOffHeap、deserialized、replication,分別表示是否采用磁盤、內存、堆外內存、反序列化以及持久化維護的副本數。其中反序列化為 false 時(好繞啊),會對對象進行序列化存儲,能夠節省一定空間,但同時會消耗計算資源。需要注意的是,cache操作是persist的一個特例,等于MEMORY_ONLY的 persist。所有的廣播對象都是MEMORY_AND_DISK的存儲級別
object?StorageLevel?extends?scala.AnyRef?with?scala.Serializable?{val?NONE?=?new?StorageLevel(false,?false,?false,?false)val?DISK_ONLY?=?new?StorageLevel(true,?false,?false,?false)val?DISK_ONLY_2?=?new?StorageLevel(true,?false,?false,?false,?2)val?MEMORY_ONLY?=?new?StorageLevel(false,?true,?false,?true)?//?默認存儲類別val?MEMORY_ONLY_2?=?new?StorageLevel(false,?true,?false,?true,?2)val?MEMORY_ONLY_SER?=?new?StorageLevel(false,?true,?false,?false)val?MEMORY_ONLY_SER_2?=?new?StorageLevel(false,?true,?false,?false,?2)val?MEMORY_AND_DISK?=?new?StorageLevel(true,?true,?false,?true)val?MEMORY_AND_DISK_2?=?new?StorageLevel(true,?true,?false,?true,?2)val?MEMORY_AND_DISK_SER?=?new?StorageLevel(true,?true,?false,?false)val?MEMORY_AND_DISK_SER_2?=?new?StorageLevel(true,?true,?false,?false,?2)val?OFF_HEAP?=?new?StorageLevel(true,?true,?true,?false,?1) }想在 Spark 任務完成之后檢查每一個 RDD 的緩存狀況是比較困難的,雖然在 Spark EventLog 中,我們也能看到在每一個 RDD 的 RDD Info 中有一個 StorageLevel 的條目。RDDInfo的源碼建議我們可以通過(Use Disk||Use Memory)&&NumberofCachedPartitions這樣的條件來判斷一個 RDD 到底有沒有被 cache。但實際上,似乎 EventLog 里面的NumberofCachedPartitions、Memory Size、Disk Size永遠是 0,這可能是只能在執行過程中才能看到這些字段的值,畢竟 WebUI 的 Storage 標簽就只在執行時能看到。不過(Use Disk||Use Memory)在 cache 調用的 RDD 上是 true 的,所以可以以這個 RDD 為根做一個 BFS,將所有不需要計算的 RDD 找出來。
BlockInfoManager
BlockInfoManager用來管理 Block 的元信息,例如它維護了所有 BlockId 的 BlockInfo 信息infos: mutable.HashMap[BlockId, BlockInfo]。不過它最主要的功能還是為讀寫 Block 提供鎖服務
本地讀 Block
本地讀方法位于 BlockManager.scala 中,從前叫getBlockData,現在叫getLocalBlockData,名字更易懂了。getLocalBlockData的主要內容就對 Block 的性質進行討論,如果是 Shuffle 的,那么就借助于ShuffleBlockResolver。
ShuffleBlockResolver是一個 trait,它有兩個子類IndexShuffleBlockResolver和ExternalShuffleBlockResolver,它們定義如何從一個 logical shuffle block identifier(例如 map、reduce 或 shuffle)中取回 Block。這個類維護 Block 和文件的映射關系,維護 index 文件,向BlockStore提供抽象。
//?BlockManager.scala override?def?getLocalBlockData(blockId:?BlockId):?ManagedBuffer?=?{if?(blockId.isShuffle)?{//?如果這個BlockId是Shuffle的,那么就通過shuffleManager的shuffleBlockResolver來獲取BlockDatashuffleManager.shuffleBlockResolver.getBlockData(blockId)}?else?{//?否則使用getLocalBytesgetLocalBytes(blockId)?match?{case?Some(blockData)?=>new?BlockManagerManagedBuffer(blockInfoManager,?blockId,?blockData,?true)case?None?=>//?If?this?block?manager?receives?a?request?for?a?block?that?it?doesn't?have?then?it's//?likely?that?the?master?has?outdated?block?statuses?for?this?block.?Therefore,?we?send//?an?RPC?so?that?this?block?is?marked?as?being?unavailable?from?this?block?manager.reportBlockStatus(blockId,?BlockStatus.empty)throw?new?BlockNotFoundException(blockId.toString)}} }我們看getLocalBytes函數,它帶鎖地調用doGetLocalBytes
def?getLocalBytes(blockId:?BlockId):?Option[BlockData]?=?{logDebug(s"Getting?local?block?$blockId?as?bytes")assert(!blockId.isShuffle,?s"Unexpected?ShuffleBlockId?$blockId")blockInfoManager.lockForReading(blockId).map?{?info?=>?doGetLocalBytes(blockId,?info)?} }上面的這一段代碼會在 spark.log 中產生類似下面的 Log,我們由此可以對 Block 的用途,存儲級別等進行分析。
19/11/26?17:24:52?DEBUG?BlockManager:?Getting?local?block?broadcast_3_piece0?as?bytes 19/11/26?17:24:52?TRACE?BlockInfoManager:?Task?-1024?trying?to?acquire?read?lock?for?broadcast_3_piece0 19/11/26?17:24:52?TRACE?BlockInfoManager:?Task?-1024?acquired?read?lock?for?broadcast_3_piece0 19/11/26?17:24:52?DEBUG?BlockManager:?Level?for?block?broadcast_3_piece0?is?StorageLevel(disk,?memory,?1?replicas)doGetLocalBytes負責根據 Block 的存儲級別,以最小的代價取到序列化后的數據。從下面的代碼中可以看到,Spark 認為序列化一個對象的開銷是高于從磁盤中讀取一個已經序列化之后的對象的開銷的,因為它寧可從磁盤里面取也不愿意直接從內存序列化。
private?def?doGetLocalBytes(blockId:?BlockId,?info:?BlockInfo):?BlockData?=?{val?level?=?info.levellogDebug(s"Level?for?block?$blockId?is?$level")//?如果內容是序列化的,先嘗試讀序列化的到內存和磁盤。//?如果內容是非序列化的,嘗試序列化內存中的對象,最后拋出異常表示不存在if?(level.deserialized)?{//?因為內存中是非序列化的,嘗試能不能先從磁盤中讀到非序列化的。if?(level.useDisk?&&?diskStore.contains(blockId))?{//?Note:?Spark在這里故意不將block放到內存里面,因為這個if分支是處理非序列化塊的,//?這個塊可能被按照非序列化對象的形式存在內存里面,因此沒必要在在內存里面存一份序列化了的。diskStore.getBytes(blockId)}?else?if?(level.useMemory?&&?memoryStore.contains(blockId))?{//?不在硬盤上,就序列化內存中的對象new?ByteBufferBlockData(serializerManager.dataSerializeWithExplicitClassTag(blockId,?memoryStore.getValues(blockId).get,?info.classTag),?true)}?else?{handleLocalReadFailure(blockId)}}?else?{//?如果存在已經序列化的對象if?(level.useMemory?&&?memoryStore.contains(blockId))?{//?先找內存new?ByteBufferBlockData(memoryStore.getBytes(blockId).get,?false)}?else?if?(level.useDisk?&&?diskStore.contains(blockId))?{//?再找磁盤val?diskData?=?diskStore.getBytes(blockId)maybeCacheDiskBytesInMemory(info,?blockId,?level,?diskData).map(new?ByteBufferBlockData(_,?false)).getOrElse(diskData)}?else?{handleLocalReadFailure(blockId)}} }Spark 的內存管理
在 Spark 1.6 之后,內存管理模式發生了大變化,從前版本的內存管理需要通過指定spark.memory.useLegacyMode來手動啟用,因此在這里只對之后的進行論述。
Spark 內存布局
如下圖所示,Spark 的堆內存空間可以分為 Spark 托管區、用戶區和保留區三塊。
其中保留區占 300MB,是固定的。托管區的大小由spark.memory.fraction節制,而1 - spark.memory.fraction的部分用戶區。這個值越小,就越容易 Spill 或者 Cache evict。這個設置的用途是將 internal metadata、user data structures 區分開來。從而減少對稀疏的或者不常出現的大對象的大小的不準確估計造成的影響(限定詞有點多,是翻譯的注釋、、、)。默認spark.memory.fraction是 0.6。
//?package.scala private[spark]?val?MEMORY_FRACTION?=?ConfigBuilder("spark.memory.fraction").doc("...").doubleConf.createWithDefault(0.6)Spark 的托管區又分為 Execution 和 Storage 兩個部分。其中 Storage 主要用來緩存 RDD、Broadcast 之類的對象,Execution 被用來存 Mapside 的 Shuffle 數據。
Storage 和 Execution 共享的內存,spark.storage.storageFraction(現在應該已經改成了spark.memory.storageFraction)表示對 eviction 免疫的 Storage 部分的大小,它的值越大,Execution 內存就越小,Task 就越容易 Spill。反之,Cache 就越容易被 evict。默認spark.memory.storageFraction是 0.5。
//?package.scala private[spark]?val?MEMORY_STORAGE_FRACTION?=?ConfigBuilder("spark.memory.storageFraction").doc("...").doubleConf.checkValue(v?=>?v?>=?0.0?&&?v?<?1.0,?"Storage?fraction?must?be?in?[0,1)").createWithDefault(0.5)Storage 可以借用任意多的 Execution 內存,直到 Execution 重新要回。此時被 Cache 的塊會被從內存中 evict 掉(具體如何 evict,根據每個 Block 的存儲級別)。Execution 也可以借用任意多的 Storage 的,但是 Execution 的借用不能被 Storage 驅逐,原因是因為實現起來很復雜。我們在稍后將看到,Spark 沒有一個統一的資源分配的入口。
除了堆內內存,Spark 還可以使用堆外內存。
MemoryManager
Spark 中負責文件管理的類是MemoryManager,它是一個抽象類,被SparkEnv持有。在 1.6 版本后引入的UnifiedMemoryManager是它的一個實現。
//?SparkEnv.scala val?memoryManager:?MemoryManager?=?UnifiedMemoryManager(conf,?numUsableCores)UnifiedMemoryManager實現了諸如acquireExecutionMemory等方法來分配內存。通過在acquireExecutionMemory時傳入一個MemoryMode可以告知是從堆內請求還是從堆外請求。需要注意的是,這類的函數并不像malloc一樣直接去請求一段內存,并返回內存的地址,而是全局去維護每個 Task 所使用的內存大小。每一個 Task 在申請內存(new 對象)之前都會去檢查一下自己有沒有超標,否則就去 Spill。也就是說MemoryManager實際上是一個外掛式的內存管理系統,它不實際上托管內存,整個內存還是由 JVM 管理的。
對 Task 的 Execution 內存使用進行跟蹤的這個機制被實現ExecutionMemoryPool中,如下面的代碼所示。
當然,有ExecutionMemoryPool就也有StorageMemoryPool,他們都不出所料繼承了MemoryPool。而以上這些 Pool 最后都被MemoryManager所持有。
//?MemoryManager.scala @GuardedBy("this") protected?val?onHeapStorageMemoryPool?=?new?StorageMemoryPool(this,?MemoryMode.ON_HEAP) @GuardedBy("this") protected?val?offHeapStorageMemoryPool?=?new?StorageMemoryPool(this,?MemoryMode.OFF_HEAP) @GuardedBy("this") protected?val?onHeapExecutionMemoryPool?=?new?ExecutionMemoryPool(this,?MemoryMode.ON_HEAP) @GuardedBy("this") protected?val?offHeapExecutionMemoryPool?=?new?ExecutionMemoryPool(this,?MemoryMode.OFF_HEAP)請求內存的流程
我們知道,在 Shuffle 操作中有兩個內存使用大戶ExecutorSorter和ExternalAppendOnlyMap,都繼承了Spillable,從而實現了在內存不足時進行 Spill。我們查看對應的maybeSpill方法,它調用了自己父類MemoryConsumer中的acquireExecutionMemory方法。
由于從代碼注釋上看似乎MemoryConsumer包括它引用到的TaskMemoryManager類都與 Tungsten 有關,所以我們將在稍后進行研究。目前只是列明調用過程,因為如果其中涉及要向 Spark 托管內存請求分配,最終調用的還是UnifiedMemoryManager中的對應方法。
//?Spillable.scala //?在maybeSpill方法中 val?granted?=?acquireMemory(amountToRequest)//?MemoryConsumer.scala public?long?acquireMemory(long?size)?{long?granted?=?taskMemoryManager.acquireExecutionMemory(size,?this);used?+=?granted;return?granted; }//?TaskMemoryManager.java public?long?acquireExecutionMemory(long?required,?MemoryConsumer?consumer)?{assert(required?>=?0);assert(consumer?!=?null);MemoryMode?mode?=?consumer.getMode();synchronized?(this)?{long?got?=?memoryManager.acquireExecutionMemory(required,?taskAttemptId,?mode);...//?Executor.scala //?TaskMemoryManager中的memoryManager,其實就是一個UnifiedMemoryManager val?taskMemoryManager?=?new?TaskMemoryManager(env.memoryManager,?taskId)下面,我們來看acquireExecutionMemory的詳細實現。它前面會首先根據memoryMode選擇使用的MemoryPool,是堆內的,還是堆外的。然后它會有個函數maybeGrowExecutionPool,用來處理在需要的情況下從 Storage 部分擠占一些內存回來。我們可以在稍后詳看這個方法。現在,我們發現acquireExecutionMemory會往對應的MemoryPool發一個調用acquireMemory。
//?UnifiedMemoryManager.scala override?private[memory]?def?acquireExecutionMemory(...//?實際上是一個ExecutionMemoryPoolexecutionPool.acquireMemory(numBytes,?taskAttemptId,?maybeGrowExecutionPool,?()?=>?computeMaxExecutionPoolSize) }//?MemoryManager.scala @GuardedBy("this") protected?val?onHeapExecutionMemoryPool?=?new?ExecutionMemoryPool(this,?MemoryMode.ON_HEAP)由于我們討論的場景就是請求堆內的執行內存,所以就進入ExecutionMemoryPool.scala查看相關代碼。在 Spark 中,會嘗試保證每個 Task 能夠得到合理份額的內存,而不是讓某些 Task 的內存持續增大到一定的數量,然后導致其他人持續地 Spill 到 Disk。
如果有 N 個任務,那么保證每個 Task 在 Spill 前可以獲得至少1 / 2N的內存,并且最多只能獲得1 / N。因為N是持續變化的,所以我們需要跟蹤活躍 Task 集合,并且持續在等待 Task 集合中更新1 / 2N和1 / N的值。這個是借助于同步機制實現的,在 1.6 之前,是由ShuffleMemoryManager來仲裁的。
Tungsten 內存管理機制
Tungsten 不依賴于 Java 對象,所以堆內和堆外的內存分配都可以支持。序列化時間相比原生的要加快很多。其優化主要包含三點:
Memory Management and Binary Processing
Cache-aware computation
Code generation
這個是為了解決在 Spark 2.0 之前 SparkSQL 使用的Volcano中大量的鏈式next()導致的性能(虛函數等)問題。
在內存管理部分,能看到諸如TaskMemoryManager.java的文件;在稍后的 Shuffle 部分,能看到諸如UnsafeWriter.java的文件。這些 Java 文件在實現上就有對 Tungsten 的使用,因為用到了 sun.misc.Unsafe 的 API,所以使用 Tungsten 的 shuffle 又叫 Unsafe shuffle。
在MemoryManager中持有了 Tungsten 內存管理機制的核心類tungstenMemoryAllocator: MemoryAllocator。并設置了tungstenMemoryMode指示其分配內存的默認位置,如果MEMORY_OFFHEAP_ENABLED是打開的且MEMORY_OFFHEAP_SIZE是大于 0 的,那么默認使用堆外內存。
TaskMemoryManager
TaskMemoryManager這個對象被用來管理一個 Task 的堆內和對外內存分配,因此它能夠調度一個 Task 中各個組件的內存使用情況。當組件需要使用TaskMemoryManager提供的內存時,他們需要繼承一個MemoryConsumer類,以便向TaskMemoryManager請求內存。TaskMemoryManager中集成了普通的內存分配機制和 Tungsten 內存分配機制。
普通分配 acquireExecutionMemory
我們跟蹤TaskMemoryManager.acquireExecutionMemory相關代碼,它先嘗試從MemoryManager直接請求內存:
//?TaskMemoryManager.scala public?long?acquireExecutionMemory(long?required,?MemoryConsumer?consumer)?{assert(required?>=?0);assert(consumer?!=?null);MemoryMode?mode?=?consumer.getMode();//?如果我們在分配堆外內存的頁,并且受到一個對堆內內存的請求,//?那么沒必要去Spill,因為怎么說也只是Spill的堆外內存。//?不過現在改這個風險很大。。。。synchronized?(this)?{long?got?=?memoryManager.acquireExecutionMemory(required,?taskAttemptId,?mode);如果請求不到,那么先嘗試讓同一個TaskMemoryManager上的其他的 Consumer Spill,以減少 Spill 頻率,從而減少 Spill 出來的小文件數量。主要是根據每個 Consumer 的內存使用排個序,從而避免重復對同一個 Consumer 進行 Spill,導致產生很多小文件。
...if?(got?<?required)?{TreeMap<Long,?List<MemoryConsumer>>?sortedConsumers?=?new?TreeMap<>();for?(MemoryConsumer?c:?consumers)?{if?(c?!=?consumer?&&?c.getUsed()?>?0?&&?c.getMode()?==?mode)?{long?key?=?c.getUsed();List<MemoryConsumer>?list?=sortedConsumers.computeIfAbsent(key,?k?->?new?ArrayList<>(1));list.add(c);}} ...現在,我們對排序得到的一系列sortedConsumers進行 spill,一旦成功釋放出內存,就立刻向 MemoryManager 去請求這些內存,相關代碼沒啥可看的,故省略。如果內存還是不夠,就 Spill 自己,如果成功了,就向 MemoryManager 請求內存。
...//?call?spill()?on?itselfif?(got?<?required)?{try?{long?released?=?consumer.spill(required?-?got,?consumer);if?(released?>?0)?{logger.debug("Task?{}?released?{}?from?itself?({})",?taskAttemptId,Utils.bytesToString(released),?consumer);got?+=?memoryManager.acquireExecutionMemory(required?-?got,?taskAttemptId,?mode);}}?catch?(ClosedByInterruptException?e)?{...}}consumers.add(consumer);logger.debug("Task?{}?acquired?{}?for?{}",?taskAttemptId,?Utils.bytesToString(got),?consumer);return?got;} }Tungsten 分配 allocatePage
TaskMemoryManager還有個allocatePage方法,用來獲得MemoryBlock,這個是通過 Tungsten 機制分配的。TaskMemoryManager使用了類似操作系統中分頁的機制來操控內存。每個“頁”,也就是MemoryBlock對象,維護了一段堆內或者堆外的內存。頁的總數由PAGE_NUMBER_BITS來決定,即對于一個 64 位的地址,高PAGE_NUMBER_BITS(默認 13)位表示一個頁,而后面的位表示在頁內的偏移。當然,如果是堆外內存,那么這個 64 位就直接是內存地址了。有關使用分頁機制的原因在TaskMemoryManager.java有介紹,我暫時沒看懂。
需要注意的是,即使使用 Tungsten 分配,仍然不能繞開UnifiedMemoryManager機制的管理,所以我們看到在allocatePage方法中先要通過acquireExecutionMemory方法注冊,請求到邏輯內存之后,再通過下面的方法請求物理內存
//?TaskMemoryManager.scala long?acquired?=?acquireExecutionMemory(size,?consumer); if?(acquired?<=?0)?{return?null; } page?=?memoryManager.tungstenMemoryAllocator().allocate(acquired);Spark Job 執行流程分析
Job 階段
下面我們通過一個 RDD 上的 Action 操作 count,查看 Spark 的 Job 是如何運行和調度的。特別注意的是,在 SparkSQL 中,Action 操作有不同的執行流程,所以宜對比著看。count通過全局的SparkContext.runJob啟動一個 Job,這個函數轉而調用DAGScheduler.runJob。Utils.getIteratorSize實際上就是遍歷一遍迭代器,以便統計 count。
//?RDD.scala def?count():?Long?=?sc.runJob(this,?Utils.getIteratorSize?_).sum //?Utils.scala def?getIteratorSize(iterator:?Iterator[_]):?Long?=?{var?count?=?0Lwhile?(iterator.hasNext)?{count?+=?1Literator.next()}count }在參數列表里面的下劃線_的作用是將方法轉為函數,Scala 中方法和函數之間有一些區別,在此不詳細討論。
下面查看runJob函數。比較有趣的是clean函數,它調用ClosureCleaner.clean方法,這個方法用來清理$outer域中未被引用的變量。因為我們要將閉包func序列化,并從 Driver 發送到 Executor 上面。序列化閉包的過程就是為每一個閉包生成一個可序列化類,在生成時,會將這個閉包所引用的外部對象也序列化。容易發現,如果我們為了使用外部對象的某些字段,而序列化整個對象,那么開銷是很大的,因此通過clean來清除不需要的部分以減少序列化開銷。
此外,getCallSite用來生成諸如s"$lastSparkMethod at $firstUserFile:$firstUserLine"這樣的字符串,它實際上會回溯調用棧,找到第一個不是在 Spark 包中的函數,即$lastSparkMethod,它是導致一個 RDD 創建的函數,比如各種 Transform 操作、sc.parallelize等。
//?SparkContext.scala def?runJob[T,?U:?ClassTag](rdd:?RDD[T],func:?(TaskContext,?Iterator[T])?=>?U,partitions:?Seq[Int],resultHandler:?(Int,?U)?=>?Unit):?Unit?=?{if?(stopped.get())?{throw?new?IllegalStateException("SparkContext?has?been?shutdown")}val?callSite?=?getCallSiteval?cleanedFunc?=?clean(func)logInfo("Starting?job:?"?+?callSite.shortForm)if?(conf.getBoolean("spark.logLineage",?false))?{logInfo("RDD's?recursive?dependencies:\n"?+?rdd.toDebugString)}dagScheduler.runJob(rdd,?cleanedFunc,?partitions,?callSite,?resultHandler,?localProperties.get)progressBar.foreach(_.finishAll())//?CheckPoint機制rdd.doCheckpoint() } private[spark]?def?clean[F?<:?AnyRef](f:?F,?checkSerializable:?Boolean?=?true?"spark]?def?clean[F?<:?AnyRef"):?F?=?{ClosureCleaner.clean(f,?checkSerializable)f }我們發現,傳入的 func 只接受一個Iterator[_]參數,但是其形參聲明卻是接受TaskContext和Iterator[T]兩個參數。這是為什么呢?這是因為runJob有不少重載函數,例如下面的這個
def?runJob[T,?U:?ClassTag](rdd:?RDD[T],func:?Iterator[T]?=>?U,partitions:?Seq[Int]):?Array[U]?=?{val?cleanedFunc?=?clean(func)runJob(rdd,?(ctx:?TaskContext,?it:?Iterator[T])?=>?cleanedFunc(it),?partitions) }下面我們查看DAGScheduler.runJob函數,它實際上就是調用submitJob,然后等待 Job 執行的結果。由于 Spark 的DAGScheduler是基于事件循環的,它擁有一個DAGSchedulerEventProcessLoop類型的變量eventProcessLoop,不同的對象向它post事件,然后在它的onReceive循環中會依次對這些事件調用處理函數。
我們需要注意的是partitions不同于我們傳入的rdd.partitions,前者是一個Array[Int],后者是一個Array[Partition]。并且在邏輯意義上,前者表示需要計算的 partition,對于如 first 之類的 Action 操作來說,它只是 rdd 的所有 partition 的一個子集,我們將在稍后的submitMissingTasks函數中繼續看到這一點。
def?runJob[T,?U](...?"T,?U"):?Unit?=?{val?start?=?System.nanoTimeval?waiter?=?submitJob(rdd,?func,?partitions,?callSite,?resultHandler,?properties)//?下面就是在等了ThreadUtils.awaitReady(waiter.completionFuture,?Duration.Inf)waiter.completionFuture.value.get?match?{case?scala.util.Success(_)?=>logInfo("Job?%d?finished:?%s,?took?%f?s".format(waiter.jobId,?callSite.shortForm,?(System.nanoTime?-?start)?/?1e9))case?scala.util.Failure(exception)?=>logInfo("Job?%d?failed:?%s,?took?%f?s".format(waiter.jobId,?callSite.shortForm,?(System.nanoTime?-?start)?/?1e9))//?SPARK-8644:?Include?user?stack?trace?in?exceptions?coming?from?DAGScheduler.val?callerStackTrace?=?Thread.currentThread().getStackTrace.tailexception.setStackTrace(exception.getStackTrace?++?callerStackTrace)throw?exception} }def?submitJob[T,?U](rdd:?RDD[T],?//?target?RDD?to?run?tasks?on,就是被執行count的RDDfunc:?(TaskContext,?Iterator[T])?=>?U,?//?在RDD每一個partition上需要跑的函數partitions:?Seq[Int],callSite:?CallSite,?//?被調用的位置resultHandler:?(Int,?U)?=>?Unit,properties:?Properties):?JobWaiter[U]?=?{//?檢查是否在一個不存在的分區上創建一個Taskval?maxPartitions?=?rdd.partitions.lengthpartitions.find(p?=>?p?>=?maxPartitions?||?p?<?0).foreach?{?p?=>throw?new?IllegalArgumentException(?"Attempting?to?access?a?non-existent?partition:?"?+?p?+?".?"?+?"Total?number?of?partitions:?"?+?maxPartitions)}//?jobId是從后往前遞增的val?jobId?=?nextJobId.getAndIncrement()if?(partitions.isEmpty)?{val?time?=?clock.getTimeMillis()//?listenerBus是一個LiveListenerBus對象,從DAGScheduler構造時得到,用來做event?log//?SparkListenerJobStart定義在SparkListener.scala文件中listenerBus.post(SparkListenerJobStart(jobId,?time,?Seq[StageInfo](?"StageInfo"),?SerializationUtils.clone(properties)))listenerBus.post(SparkListenerJobEnd(jobId,?time,?JobSucceeded))//?如果partitions是空的,那么就直接返回return?new?JobWaiter[U](this,?jobId,?0,?resultHandler?"U")}assert(partitions.nonEmpty)val?func2?=?func.asInstanceOf[(TaskContext,?Iterator[_])?=>?_]val?waiter?=?new?JobWaiter[U](this,?jobId,?partitions.size,?resultHandler?"U")//?我們向eventProcessLoop提交一個JobSubmitted事件eventProcessLoop.post(JobSubmitted(jobId,?rdd,?func2,?partitions.toArray,?callSite,?waiter,SerializationUtils.clone(properties)))waiter } //?DAGSchedulerEvent.scala private[scheduler]?case?class?JobSubmitted(jobId:?Int,finalRDD:?RDD[_],func:?(TaskContext,?Iterator[_])?=>?_,partitions:?Array[Int],callSite:?CallSite,listener:?JobListener,properties:?Properties?=?null)extends?DAGSchedulerEvent下面我們具體看看對JobSubmitted的響應
//?DAGScheduler.scala private[scheduler]?def?handleJobSubmitted(...)?{var?finalStage:?ResultStage?=?null//?首先我們嘗試創建一個`finalStage: ResultStage`,這是整個Job的最后一個Stage。try?{//?func:?(TaskContext,?Iterator[_])?=>?_//?下面的語句是可能拋BarrierJobSlotsNumberCheckFailed或者其他異常的,//?例如一個HadoopRDD所依賴的HDFS文件被刪除了finalStage?=?createResultStage(finalRDD,?func,?partitions,?jobId,?callSite)}?catch?{...//?DAGScheduler.scala private?def?createResultStage(...):?ResultStage?=?{checkBarrierStageWithDynamicAllocation(rdd)checkBarrierStageWithNumSlots(rdd)checkBarrierStageWithRDDChainPattern(rdd,?partitions.toSet.size)val?parents?=?getOrCreateParentStages(rdd,?jobId)val?id?=?nextStageId.getAndIncrement()val?stage?=?new?ResultStage(id,?rdd,?func,?partitions,?parents,?jobId,?callSite)stageIdToStage(id)?=?stageupdateJobIdStageIdMaps(jobId,?stage)stage }這里createResultStage所返回的ResultStage繼承了Stage類。Stage類有個rdd參數,對ResultStage而言就是finalRDD,對ShuffleMapStage而言就是ShuffleDependency.rdd
//?DAGScheduler.scala def?createShuffleMapStage[K,?V,?C](shuffleDep:?ShuffleDependency[K,?V,?C],?jobId:?Int):?ShuffleMapStage?=?{val?rdd?=?shuffleDep.rdd...下面我們來看看checkBarrierStageWithNumSlots這個函數,因為它會拋出BarrierJobSlotsNumberCheckFailed這個異常,被handleJobSubmitted捕獲。這個函數主要是為了檢測是否有足夠的 slots 去運行所有的 barrier task。屏障調度器是 Spark 為了支持深度學習在 2.4.0 版本所引入的一個特性。它要求在 barrier stage 中同時啟動所有的 Task,當任意的 task 執行失敗的時候,總是重啟整個 barrier stage。這么麻煩是因為 Spark 希望能夠在 Task 中提供一個 barrier 以供顯式同步。
//?DAGScheduler.scala private?def?checkBarrierStageWithNumSlots(rdd:?RDD[_]):?Unit?=?{val?numPartitions?=?rdd.getNumPartitionsval?maxNumConcurrentTasks?=?sc.maxNumConcurrentTasksif?(rdd.isBarrier()?&&?numPartitions?>?maxNumConcurrentTasks)?{throw?new?BarrierJobSlotsNumberCheckFailed(numPartitions,?maxNumConcurrentTasks)} }//?DAGScheduler.scala...case?e:?BarrierJobSlotsNumberCheckFailed?=>//?If?jobId?doesn't?exist?in?the?map,?Scala?coverts?its?value?null?to?0:?Int?automatically.//?barrierJobIdToNumTasksCheckFailures是一個ConcurrentHashMap,表示對每個BarrierJob上失敗的Task數量val?numCheckFailures?=?barrierJobIdToNumTasksCheckFailures.compute(jobId,(_:?Int,?value:?Int)?=>?value?+?1)...if?(numCheckFailures?<=?maxFailureNumTasksCheck)?{messageScheduler.schedule(new?Runnable?{override?def?run():?Unit?=?eventProcessLoop.post(JobSubmitted(jobId,?finalRDD,?func,partitions,?callSite,?listener,?properties))},timeIntervalNumTasksCheck,TimeUnit.SECONDS)return}?else?{//?Job?failed,?clear?internal?data.barrierJobIdToNumTasksCheckFailures.remove(jobId)listener.jobFailed(e)return}case?e:?Exception?=>logWarning("Creating?new?stage?failed?due?to?exception?-?job:?"?+?jobId,?e)listener.jobFailed(e)return}//?Job?submitted,?clear?internal?data.barrierJobIdToNumTasksCheckFailures.remove(jobId)...下面開始創建 Job。ActiveJob表示在DAGScheduler里面運行的一個 Job。
Job 只負責向“葉子”Stage 要結果,而之前 Stage 的運行是由DAGScheduler來調度的。這是因為若干 Job 可能共用同一個 Stage 的計算結果,我這樣說的根據是在 Stage 類的定義中的jobIds字段是一個HashSet,也就是說它可以屬于多個 Job。所以將某個 Stage 強行歸屬到某個 Job 是不符合 Spark 設計邏輯的。
//?DAGScheduler.scala...val?job?=?new?ActiveJob(jobId,?finalStage,?callSite,?listener,?properties)clearCacheLocs()//?在這里會打印四條日志,這個可以被用來在Spark.log里面定位事件logInfo("Got?job?%s?(%s)?with?%d?output?partitions".format(job.jobId,?callSite.shortForm,?partitions.length))logInfo("Final?stage:?"?+?finalStage?+?"?("?+?finalStage.name?+?")")logInfo("Parents?of?final?stage:?"?+?finalStage.parents)logInfo("Missing?parents:?"?+?getMissingParentStages(finalStage))...val?stageIds?=?jobIdToStageIds(jobId).toArrayval?stageInfos?=?stageIds.flatMap(id?=>?stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId,?jobSubmissionTime,?stageInfos,?properties))//?從最后一個stage開始調用submitStagesubmitStage(finalStage) }Stage 階段
Stage 是如何劃分的呢?又是如何計算 Stage 之間的依賴的?我們繼續查看submitStage這個函數,對于一個 Stage,首先調用getMissingParentStages看看它的父 Stage 能不能直接用,也就是說這個 Stage 的 rdd 所依賴的所有父 RDD 能不能直接用,如果不行的話,就要先算父 Stage 的。在前面的論述里,我們知道,若干 Job 可能共用同一個 Stage 的計算結果,而不同的 Stage 也可能依賴同一個 RDD。
private?def?submitStage(stage:?Stage)?{//?找到這個stage所屬的jobval?jobId?=?activeJobForStage(stage)if?(jobId.isDefined)?{logDebug("submitStage("?+?stage?+?")")if?(!waitingStages(stage)?&&?!runningStages(stage)?&&?!failedStages(stage))?{//?如果依賴之前的Stage,先列出來,并且按照id排序val?missing?=?getMissingParentStages(stage).sortBy(_.id)logDebug("missing:?"?+?missing)if?(missing.isEmpty)?{//?運行這個StagelogInfo("Submitting?"?+?stage?+?"?("?+?stage.rdd?+?"),?which?has?no?missing?parents")submitMissingTasks(stage,?jobId.get)}?else?{//?先提交所有的parent?stagefor?(parent?<-?missing)?{submitStage(parent)}waitingStages?+=?stage}}}?else?{abortStage(stage,?"No?active?job?for?stage?"?+?stage.id,?None)} }下面具體查看getMissingParentStages這個函數,可以看到,Stage 的計算鏈是以最后一個 RDD 為樹根逆著向上遍歷得到的,而這個鏈條的終點要么是一個ShuffleDependency,要么是一個所有分區都被緩存了的 RDD。
private?def?getMissingParentStages(stage:?Stage):?List[Stage]?=?{val?missing?=?new?HashSet[Stage]val?visited?=?new?HashSet[RDD[_]]val?waitingForVisit?=?new?ListBuffer[RDD[_]]//?這里是個**DFS**,棧是手動維護的,主要是為了防止爆棧waitingForVisit?+=?stage.rdddef?visit(rdd:?RDD[_]):?Unit?=?{if?(!visited(rdd))?{visited?+=?rddval?rddHasUncachedPartitions?=?getCacheLocs(rdd).contains(Nil)if?(rddHasUncachedPartitions)?{//?如果這個RDD有沒有被緩存的Partition,那么它就需要被計算for?(dep?<-?rdd.dependencies)?{//?我們檢查這個RDD的所有依賴dep?match?{case?shufDep:?ShuffleDependency[_,?_,?_]?=>//?我們發現一個寬依賴,因此我們創建一個新的Shuffle?Stage,并加入到missing中(如果不存在)//?由于是寬依賴,所以我們不需要向上找了val?mapStage?=?getOrCreateShuffleMapStage(shufDep,?stage.firstJobId)if?(!mapStage.isAvailable)?{missing?+=?mapStage}case?narrowDep:?NarrowDependency[_]?=>//?如果是一個窄依賴,就加入到waitingForVisit中//?prepend是在頭部加,+=是在尾部加waitingForVisit.prepend(narrowDep.rdd)}}}}}while?(waitingForVisit.nonEmpty)?{visit(waitingForVisit.remove(0))}missing.toList }Task 階段
下面是重頭戲submitMissingTasks,這個方法負責生成 TaskSet,并且將它提交給 TaskScheduler 低層調度器。
partitionsToCompute計算有哪些分區是待計算的。根據 Stage 類型的不同,findMissingPartitions的計算方法也不同。
這個outputCommitCoordinator是由SparkEnv維護的OutputCommitCoordinator對象,它決定到底誰有權利向輸出寫數據。在 Executor 上的請求會通過他持有的 Driver 的OutputCommitCoordinatorEndpoint的引用發送給 Driver 處理
//?DAGScheduler.scala...//?Use?the?scheduling?pool,?job?group,?description,?etc.?from?an?ActiveJob?associated//?with?this?Stageval?properties?=?jobIdToActiveJob(jobId).propertiesrunningStages?+=?stage//?在檢測Tasks是否serializable之前,就要SparkListenerStageSubmitted,//?如果不能serializable,那就在這**之后**給一個SparkListenerStageCompletedstage?match?{case?s:?ShuffleMapStage?=>outputCommitCoordinator.stageStart(stage?=?s.id,?maxPartitionId?=?s.numPartitions?-?1)case?s:?ResultStage?=>outputCommitCoordinator.stageStart(stage?=?s.id,?maxPartitionId?=?s.rdd.partitions.length?-?1)}...用getPreferredLocs計算每個分區的最佳計算位置,它實際上是調用getPreferredLocsInternal這個函數。這個函數是一個關于visit: HashSet[(RDD[_], Int)]的遞歸函數,visit 用(rdd, partition)元組唯一描述一個分區。getPreferredLocs的計算邏輯是這樣的:
如果已經 visit 過了,就返回 Nil
如果是被 cached 的,通過getCacheLocs返回 cache 的位置
如果 RDD 有自己的偏好位置,例如輸入 RDD,那么使用rdd.preferredLocations返回它的偏好位置
如果還沒返回,但 RDD 有窄依賴,那么遍歷它的所有依賴項,返回第一個具有位置偏好的依賴項的值
理論上,一個最優的位置選取應該盡可能靠近數據源以減少網絡傳輸,但目前版本的 Spark 還沒有實現
//?DAGScheduler.scala...val?taskIdToLocations:?Map[Int,?Seq[TaskLocation]]?=?try?{stage?match?{case?s:?ShuffleMapStage?=>partitionsToCompute.map?{?id?=>?(id,?getPreferredLocs(stage.rdd,?id))}.toMapcase?s:?ResultStage?=>partitionsToCompute.map?{?id?=>val?p?=?s.partitions(id)(id,?getPreferredLocs(stage.rdd,?p))}.toMap}}?catch?{case?NonFatal(e)?=>//?如果有非致命異常就創建一個新的Attempt,并且abortStage(這還不致命么)stage.makeNewStageAttempt(partitionsToCompute.size)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,?properties))abortStage(stage,?s"Task?creation?failed:?$e\n${Utils.exceptionString(e)}",?Some(e))runningStages?-=?stagereturn}...下面,我們開始 attempt 這個 Stage,我們需要將 RDD 對象和依賴通過closureSerializer序列化成taskBinaryBytes,然后廣播得到taskBinary。當廣播變量過大時,會產生一條Broadcasting large task binary with size的 INFO。
//?DAGScheduler.scala...stage.makeNewStageAttempt(partitionsToCompute.size,?taskIdToLocations.values.toSeq)//?如果沒有Task要執行,實際上就是skip了,那么就沒有Submission?Time這個字段if?(partitionsToCompute.nonEmpty)?{stage.latestInfo.submissionTime?=?Some(clock.getTimeMillis())}listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,?properties))// TODO:?也許可以將`taskBinary`放到Stage里面以避免對它序列化多次。//?一堆注釋看不懂var?taskBinary:?Broadcast[Array[Byte]]?=?nullvar?partitions:?Array[Partition]?=?nulltry?{var?taskBinaryBytes:?Array[Byte]?=?null//?taskBinaryBytes?and?partitions?are?both?effected?by?the?checkpoint?status.?We?need//?this?synchronization?in?case?another?concurrent?job?is?checkpointing?this?RDD,?so?we?get?a//?consistent?view?of?both?variables.RDDCheckpointData.synchronized?{taskBinaryBytes?=?stage?match?{case?stage:?ShuffleMapStage?=>JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd,?stage.shuffleDep):?AnyRef))case?stage:?ResultStage?=>//?注意這里的stage.func已經被ClosureCleaner清理過了JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd,?stage.func):?AnyRef))}partitions?=?stage.rdd.partitions}...//?廣播taskBinary?=?sc.broadcast(taskBinaryBytes)}?catch?{//?In?the?case?of?a?failure?during?serialization,?abort?the?stage.case?e:?NotSerializableException?=>abortStage(stage,?"Task?not?serializable:?"?+?e.toString,?Some(e))runningStages?-=?stage...}下面,我們根據 Stage 的類型生成 Task。
//?DAGScheduler.scala...val?tasks:?Seq[Task[_]]?=?try?{val?serializedTaskMetrics?=?closureSerializer.serialize(stage.latestInfo.taskMetrics).array()stage?match?{case?stage:?ShuffleMapStage?=>stage.pendingPartitions.clear()partitionsToCompute.map?{?id?=>val?locs?=?taskIdToLocations(id)val?part?=?partitions(id)stage.pendingPartitions?+=?idnew?ShuffleMapTask(stage.id,?stage.latestInfo.attemptNumber,taskBinary,?part,?locs,?properties,?serializedTaskMetrics,?Option(jobId),Option(sc.applicationId),?sc.applicationAttemptId,?stage.rdd.isBarrier())}case?stage:?ResultStage?=>partitionsToCompute.map?{?id?=>val?p:?Int?=?stage.partitions(id)val?part?=?partitions(p)val?locs?=?taskIdToLocations(id)new?ResultTask(stage.id,?stage.latestInfo.attemptNumber,taskBinary,?part,?locs,?id,?properties,?serializedTaskMetrics,Option(jobId),?Option(sc.applicationId),?sc.applicationAttemptId,stage.rdd.isBarrier())}}}?catch?{...}我們將生成的tasks包裝成一個TaskSet,并且提交給taskScheduler。
//?DAGScheduler.scala...if?(tasks.nonEmpty)?{logInfo(s"Submitting?${tasks.size}?missing?tasks?from?$stage?(${stage.rdd})?(first?15?"?+s"tasks?are?for?partitions?${tasks.take(15).map(_.partitionId)})")taskScheduler.submitTasks(new?TaskSet(tasks.toArray,?stage.id,?stage.latestInfo.attemptNumber,?jobId,?properties))}?else?{如果 tasks 是空的,說明任務就已經完成了,打上 DEBUG 日志,并且調用submitWaitingChildStages
????//?Because?we?posted?SparkListenerStageSubmitted?earlier,?we?should?mark//?the?stage?as?completed?here?in?case?there?are?no?tasks?to?runmarkStageAsFinished(stage,?None)stage?match?{case?stage:?ShuffleMapStage?=>logDebug(s"Stage?${stage}?is?actually?done;?"?+s"(available:?${stage.isAvailable},"?+s"available?outputs:?${stage.numAvailableOutputs},"?+s"partitions:?${stage.numPartitions})")markMapStageJobsAsFinished(stage)case?stage?:?ResultStage?=>logDebug(s"Stage?${stage}?is?actually?done;?(partitions:?${stage.numPartitions})")}submitWaitingChildStages(stage)} }Shuffle
Shuffle 機制是 Spark Core 的核心內容。在 Stage 和 Stage 之間,Spark 需要 Shuffle 數據。這個流程包含上一個 Stage 上的 Shuffle Write,中間的數據傳輸,以及下一個 Stage 的 Shuffle Read。如下圖所示:
Shuffle 類操作常常發生在寬依賴的 RDD 之間,這類算子需要將多個節點上的數據拉取到同一節點上進行計算,其中存在大量磁盤 IO、序列化和網絡傳輸開銷,它們可以分為以下幾點來討論。
當 Spark 中的某個節點故障之后,常常需要重算 RDD 中的某幾個分區。對于窄依賴而言,父 RDD 的一個分區只對應一個子 RDD 分區,因此丟失子 RDD 的分區,重算整個父 RDD 分區是必要的。而對于寬依賴而言,父 RDD 會被多個子 RDD 使用,而可能當前丟失的子 RDD 只使用了父 RDD 中的某幾個分區的數據,而我們仍然要重新計算整個父 RDD,這造成了計算資源的浪費。
當使用 Aggregate 類(如groupByKey)或者 Join 類這種 Shuffle 算子時,如果選擇的key上的數據是傾斜(skew)的,會導致部分節點上的負載增大。對于這種情況除了可以增加 Executor 的內存,還可以重新選擇分區函數(例如在之前的 key 上加鹽)來平衡分區。
Shuffle Read 操作容易產生 OOM,其原因是盡管在BlockStoreShuffleReader中會產生外部排序的resultIter,但在這之前,ExternalAppendOnlyMap先要從 BlockManager 拉取數據(k, v)到自己的currentMap中,如果這里的v很大,那么就會導致 Executor 的 OOM 問題。可以從PairRDDFunctions的文檔中佐證這一點。在Dataset中并沒有reduceByKey,原因可能與 Catalyst Optimizer 的優化有關,但考慮到groupByKey還是比較坑的,感覺這個舉措并不明智。
Shuffle 考古
在 Spark0.8 版本前,Spark 只有 Hash Based Shuffle 的機制。在這種方式下,假定 Shuffle Write 階段(有的也叫 Map 階段)有W個 Task,在 Shuffle Read 階段(有的也叫 Reduce 階段)有R個 Task,那么就會產生W*R個文件。這樣的壞處是對文件系統產生很大壓力,并且 IO 也差(隨機讀寫)。由于這些文件是先全量在內存里面構造,再 dump 到磁盤上,所以 Shuffle 在 Write 階段就很可能 OOM。
為了解決這個問題,在 Spark 0.8.1 版本加入了 File Consolidation,以求將W個 Task 的輸出盡可能合并。現在,Executor 上的每一個執行單位都生成自己獨一份的文件。假定所有的 Executor 總共有C個核心,每個 Task 占用T個核心,那么總共有C/T個執行單位。考慮極端情況,如果C==T,那么任務實際上是串行的,所以寫一個文件就行了。因此,最終會生成C/T*R個文件。
但這個版本仍然沒有解決 OOM 的問題。雖然對于 reduce 這類操作,比如count,因為是來一個 combine 一個,所以只要你的 V 不是數組,也不想強行把結果 concat 成一個數組,一般都沒有較大的內存問題。但是考慮如果我們執行groupByKey這樣的操作,在 Read 階段每個 Task 需要得到得到自己負責的 key 對應的所有 value,而我們現在每個 Task 得到的是若干很大的文件,這個文件里面的 key 是雜亂無章的。如果我們需要得到一個 key 對應的所有 value,那么我們就需要遍歷這個文件,將 key 和對應的 value 全部存放在一個結構比如 HashMap 中,并進行合并。因此,我們必須保證這個 HashMap 足夠大。既然如此,我們很容易想到一個基于外部排序的方案,我們為什么不能對 key 進行外排呢?確實在 Hadoop MapReduce 中會做歸并排序,因此 Reducer 側的數據按照 key 組織好的了。但 Spark 在這個版本沒有這么做,并且 Spark 在下一個版本就這么做了。
在 Spark 0.9 版本之后,引入了ExternalAppendOnlyMap,通過這個結構,SparkShuffle 在 combine 的時候如果內存不夠,就能 Spill 到磁盤,并在 Spill 的時候進行排序。當然,內存還是要能承載一個 KV 的,我們將在稍后的源碼分析中深入研究這個問題。
終于在 Spark1.1 版本之后引入了 Sorted Based Shuffle。此時,Shuffle Write 階段會按照 Partition ID 以及 key 對記錄進行排序。同時將全部結果寫到一個數據文件中,同時生成一個索引文件,Shuffle Read 的 Task 可以通過該索引文件獲取相關的數據。
在 Spark 1.5,Tungsten內存管理機制成為了 Spark 的默認選項。如果關閉spark.sql.tungsten.enabled,Spark 將采用基于 Kryo 序列化的列式存儲格式。
Shuffle Read 端源碼分析
Shuffle Read 一般位于一個 Stage 的開始,這時候上一個 Stage 會給我們留下一個 ShuffledRDD。在它的compute方法中會首先取出shuffleManager: ShuffleManager。
ShuffleManager是一個 Trait,它的兩個實現就是org.apache.spark.shuffle.hash.HashShuffleManager和
org.apache.spark.shuffle.sort.SortShuffleManager
接著,我們調用shuffleManager.getReader方法返回一個BlockStoreShuffleReader,它用來讀取[split.index, split.index + 1)這個區間內的 Shuffle 數據。接著,它會調用SparkEnv.get.mapOutputTracker的getMapSizesByExecutorId方法。
getMapSizesByExecutorId返回一個迭代器Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],表示對于某個BlockManagerId,它所存儲的 Shuffle Write 中間結果,包括BlockId、大小和 index。
具體實現上,這個方法首先從傳入的dep.shuffleHandle中獲得當前 Shuffle 過程的唯一標識shuffleId,然后它會從自己維護的shuffleStatuses中找到shuffleId對應的MapStatus,它應該有endPartition-startPartition這么多個。接著,對這些MapStatus,調用convertMapStatuses獲得迭代器。在compute中,實際上就只取當前split這一個 Partition 的 Shuffle 元數據。
ShuffleManager通過調用BlockStoreShuffleReader.read返回一個迭代器Iterator[(K, C)]。在BlockStoreShuffleReader.read方法中,首先得到一個ShuffleBlockFetcherIterator
//?BlockStoreShuffleReader.scala override?def?read():?Iterator[Product2[K,?C]]?=?{val?wrappedStreams?=?new?ShuffleBlockFetcherIterator(...)?//?返回一個ShuffleBlockFetcherIterator.toCompletionIterator?//?返回一個Iterator[(BlockId,?InputStream)]ShuffleBlockFetcherIterator用fetchUpToMaxBytes()和 fetchLocalBlocks()分別讀取 remote 和 local 的 Block。在拉取遠程數據的時候,會統計bytesInFlight、reqsInFlight等信息,并使用maxBytesInFlight和maxReqsInFlight節制。同時,為了允許 5 個并發同時拉取數據,還會設置targetRemoteRequestSize = math.max(maxBytesInFlight / 5, 1L)去請求每次拉取數據的最大大小。通過ShuffleBlockFetcherIterator.splitLocalRemoteBytes,現在改名叫partitionBlocksByFetchMode函數,可以將 blocks 分為 Local 和 Remote 的。關于兩個函數的具體實現,將單獨討論。
??val?serializerInstance?=?dep.serializer.newInstance()//?Create?a?key/value?iterator?for?each?streamval?recordIter?=?wrappedStreams.flatMap?{?case?(blockId,?wrappedStream)?=>serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator}//?Update?the?context?task?metrics?for?each?record?read.//?CompletionIterator相比普通的Iterator的區別就是在結束之后會調用一個completion函數//?CompletionIterator通過它伴生對象的apply方法創建,傳入第二個參數即completionFunctionval?metricIter?=?CompletionIterator[(Any,?Any),?Iterator[(Any,?Any)]](recordIter.map?{?record?=>readMetrics.incRecordsRead(1)record},context.taskMetrics().mergeShuffleReadMetrics())//?An?interruptible?iterator?must?be?used?here?in?order?to?support?task?cancellationval?interruptibleIter?=?new?InterruptibleIterator[(Any,?Any)](context,?metricIter?"(Any,?Any)")...經過一系列轉換,我們得到一個interruptibleIter。接下來,根據是否有 mapSideCombine 對它進行聚合。這里的dep來自于BaseShuffleHandle對象,它是一個ShuffleDependency。在前面 Spark 的任務調度中已經提到,ShuffleDependency就是寬依賴。
//?BlockStoreShuffleReader.scala...val?aggregatedIter:?Iterator[Product2[K,?C]]?=?if?(dep.aggregator.isDefined)?{if?(dep.mapSideCombine)?{//?We?are?reading?values?that?are?already?combinedval?combinedKeyValuesIterator?=?interruptibleIter.asInstanceOf[Iterator[(K,?C)]]dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator,?context)}?else?{//?We?don't?know?the?value?type,?but?also?don't?care?--?the?dependency?*should*//?have?made?sure?its?compatible?w/?this?aggregator,?which?will?convert?the?value//?type?to?the?combined?type?Cval?keyValuesIterator?=?interruptibleIter.asInstanceOf[Iterator[(K,?Nothing)]]dep.aggregator.get.combineValuesByKey(keyValuesIterator,?context)}}?else?{interruptibleIter.asInstanceOf[Iterator[Product2[K,?C]]]}這里的aggregator是Aggregator[K, V, C],這里的K、V和C與熟悉combineByKey的是一樣的。需要注意的是,在 combine 的過程中借助了ExternalAppendOnlyMap,這是之前提到的在 Spark 0.9 中引入的重要特性。通過調用insertAll方法能夠將interruptibleIter內部的數據添加到ExternalAppendOnlyMap中,并在之后更新 MemoryBytesSpilled、DiskBytesSpilled、PeakExecutionMemory 三個統計維度,這也是我們在 Event Log 中所看到的統計維度。
//?Aggregator.scala case?class?Aggregator[K,?V,?C]?(createCombiner:?V?=>?C,mergeValue:?(C,?V)?=>?C,mergeCombiners:?(C,?C)?=>?C)?{def?combineValuesByKey(iter:?Iterator[_?<:?Product2[K,?V]],context:?TaskContext):?Iterator[(K,?C)]?=?{val?combiners?=?new?ExternalAppendOnlyMap[K,?V,?C](createCombiner,?mergeValue,?mergeCombiners?"K,?V,?C")combiners.insertAll(iter)updateMetrics(context,?combiners)combiners.iterator}def?combineCombinersByKey(iter:?Iterator[_?<:?Product2[K,?C]],context:?TaskContext):?Iterator[(K,?C)]?=?{val?combiners?=?new?ExternalAppendOnlyMap[K,?C,?C](identity,?mergeCombiners,?mergeCombiners?"K,?C,?C")//?同上}/**?Update?task?metrics?after?populating?the?external?map.?*/private?def?updateMetrics(context:?TaskContext,?map:?ExternalAppendOnlyMap[_,?_,?_]):?Unit?=?{Option(context).foreach?{?c?=>c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)}} }在獲得 Aggregate 迭代器之后,最后一步,我們要進行排序,這時候就需要用到ExternalSorter這個對象。
//?BlockStoreShuffleReader.scala ...val?resultIter?=?dep.keyOrdering?match?{case?Some(keyOrd:?Ordering[K])?=>val?sorter?=?new?ExternalSorter[K,?C,?C](context,?ordering?=?Some(keyOrd?"K,?C,?C"),?serializer?=?dep.serializer)sorter.insertAll(aggregatedIter)context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)//?Use?completion?callback?to?stop?sorter?if?task?was?finished/cancelled.context.addTaskCompletionListener[Unit](_?=>?{sorter.stop()})CompletionIterator[Product2[K,?C],?Iterator[Product2[K,?C]]](sorter.iterator,?sorter.stop(?"Product2[K,?C],?Iterator[Product2[K,?C]]"))case?None?=>aggregatedIter}ExternalAppendOnlyMap 和 AppendOnlyMap
我們查看ExternalAppendOnlyMap的實現。ExternalAppendOnlyMap擁有一個currentMap管理在內存中存儲的鍵值對們。和一個DiskMapIterator的數組spilledMaps,表示 Spill 到磁盤上的鍵值對們。
@volatile?private[collection]?var?currentMap?=?new?SizeTrackingAppendOnlyMap[K,?C] private?val?spilledMaps?=?new?ArrayBuffer[DiskMapIterator]先來看currentMap,它是一個SizeTrackingAppendOnlyMap。這個東西實際上就是一個AppendOnlyMap,不過給它加上了統計數據大小的功能,主要是借助于SizeTracker中afterUpdate和resetSamples兩個方法。我們知道非序列化對象在內存存儲上是不連續的,我們需要通過遍歷迭代器才能知道對象的具體大小,而這個開銷是比較大的。因此,通過SizeTracker我們可以得到一個內存空間占用的估計值,從來用來判定是否需要 Spill。
下面,我們來看currentMap.insertAll這個方法
可以看出,在insertAll中主要做了兩件事情:
遍歷curEntry <- entries,并通過傳入的update函數進行 Combine 在內部存儲上,AppendOnlyMap,包括后面將看到的一些其他 KV 容器,都傾向于將(K, V)對放到哈希表的相鄰兩個位置,這樣的好處應該是避免訪問時再進行一次跳轉。
下面的代碼是AppendOnlyMap.changeValue的實現,它接受一個updateFunc用來更新一個指定K的值。updateFunc接受第一個布爾值,用來表示是不是首次出現這個 key。我們需要注意,AppendOnlyMap里面 null 是一個合法的鍵,但同時null又作為它里面的哈希表的默認填充,因此它里面有個對null特殊處理的過程。
//?AppendOnlyMap.scala //?這里的nullValue和haveNullValue是用來單獨處理k為null的情況的,下面會詳細說明 private?var?haveNullValue?=?false //?有關null.asInstanceOf[V]的花里胡哨的語法,詳見?https://stackoverflow.com/questions/10749010/if-an-int-cant-be-null-what-does-null-asinstanceofint-mean private?var?nullValue:?V?=?null.asInstanceOf[V]def?changeValue(key:?K,?updateFunc:?(Boolean,?V)?=>?V):?V?=?{//?updateFunc就是從insertAll傳入的updateassert(!destroyed,?destructionMessage)val?k?=?key.asInstanceOf[AnyRef]if?(k.eq(null))?{if?(!haveNullValue)?{//?如果這時候還沒有null的這個key,就新創建一個incrementSize()}nullValue?=?updateFunc(haveNullValue,?nullValue)haveNullValue?=?truereturn?nullValue}var?pos?=?rehash(k.hashCode)?&?maskvar?i?=?1while?(true)?{//?乘以2的原因是他按照K1?V1?K2?V2這樣放的val?curKey?=?data(2?*?pos)if?(curKey.eq(null))?{//?如果對應的key不存在,就新創建一個//?這也是為什么前面要單獨處理null的原因,這里的null被用來做placeholder了//?可以看到,第一個參數傳的false,第二個是花里胡哨的nullval?newValue?=?updateFunc(false,?null.asInstanceOf[V])data(2?*?pos)?=?kdata(2?*?pos?+?1)?=?newValue.asInstanceOf[AnyRef]incrementSize()return?newValue}?else?if?(k.eq(curKey)?||?k.equals(curKey))?{?//?又是從Java繼承下來的花里胡哨的特性val?newValue?=?updateFunc(true,?data(2?*?pos?+?1).asInstanceOf[V])data(2?*?pos?+?1)?=?newValue.asInstanceOf[AnyRef]return?newValue}?else?{//?再散列val?delta?=?ipos?=?(pos?+?delta)?&?maski?+=?1}}null.asInstanceOf[V]?//?Never?reached?but?needed?to?keep?compiler?happy }估計currentMap的當前大小,并調用currentMap.maybeSpill向磁盤 Spill。我們將在單獨的章節論述SizeTracker如何估計集合大小,先看具體的 Spill 過程,可以梳理出shouldSpill==true的情況
1、 elementsRead % 32 == 0
2、 currentMemory >= myMemoryThreshold
3、 通過acquireMemory請求的內存不足以擴展到2 * currentMemory的大小,關于這一步驟已經在內存管理部分詳細說明了,在這就不詳細說了
下面就是真正 Spill 的過程了,其實就是調用 spill 函數。注意_memoryBytesSpilled就是我們在 Event Log 里面看到的 Memory Spill 的統計量,他表示在 Spill 之后我們能夠釋放多少內存:
//?Spillable.scala...//?Actually?spillif?(shouldSpill)?{_spillCount?+=?1?//?統計Spill的次數logSpillage(currentMemory)spill(collection)_elementsRead?=?0?//?重置強制Spill計數器_elementsRead_memoryBytesSpilled?+=?currentMemoryreleaseMemory()}shouldSpill }在insertAll之后,會返回一個迭代器,我們查看相關方法。可以發現如果spilledMaps都是空的,也就是沒有 Spill 的話,就返回內存里面currentMap的iterator,否則就返回一個ExternalIterator。
對于第一種情況,會用SpillableIterator包裹一下。這個類在很多地方有定義,包括ExternalAppendOnlyMap.scala,ExternalSorter.scala里面。在當前使用的實現中,它實際上就是封裝了一下Iterator,使得能夠 spill,轉換成CompletionIterator等。
對于第二種情況,ExternalIterator比較有趣,將在稍后進行討論。
//?ExternalAppendOnlyMap.scala override?def?iterator:?Iterator[(K,?C)]?=?{...if?(spilledMaps.isEmpty)?{//?如果沒有發生SpilldestructiveIterator(currentMap.iterator)}?else?{//?如果發生了Spillnew?ExternalIterator()} }def?destructiveIterator(inMemoryIterator:?Iterator[(K,?C)]):?Iterator[(K,?C)]?=?{readingIterator?=?new?SpillableIterator(inMemoryIterator)readingIterator.toCompletionIterator }而currentMap.iterator實際上就是一個樸素無華的迭代器的實現。
//?AppendOnlyMap.scala def?nextValue():?(K,?V)?=?{if?(pos?==?-1)?{????//?Treat?position?-1?as?looking?at?the?null?valueif?(haveNullValue)?{return?(null.asInstanceOf[K],?nullValue)}pos?+=?1}while?(pos?<?capacity)?{if?(!data(2?*?pos).eq(null))?{return?(data(2?*?pos).asInstanceOf[K],?data(2?*?pos?+?1).asInstanceOf[V])}pos?+=?1}null }ExternalSorter
ExternalSorter的作用是對輸入的(K, V)進行排序,以產生新的(K, C)對,排序過程中可選擇進行 combine,否則輸出的C == V。需要注意的是ExternalSorter不僅被用在 Shuffle Read 端,也被用在了 Shuffle Write 端,所以在后面會提到 Map-side combine 的概念。ExternalSorter具有如下的參數,在給定ordering之后,ExternalSorter就會按照它來排序。在 Spark 源碼中建議如果希望進行 Map-side combining 的話,就指定ordering,否則就可以設置ordering為null
private[spark]?class?ExternalSorter[K,?V,?C](context:?TaskContext,aggregator:?Option[Aggregator[K,?V,?C]]?=?None,partitioner:?Option[Partitioner]?=?None,ordering:?Option[Ordering[K]]?=?None,serializer:?Serializer?=?SparkEnv.get.serializer)extends?Spillable[WritablePartitionedPairCollection[K,?C]](context.taskMemoryManager(?"WritablePartitionedPairCollection[K,?C]"))由于ExternalSorter支持有 combine 和沒有 combine 的兩種模式,因此對應設置了兩個對象。map = new PartitionedAppendOnlyMap[K, C],以及buffer = new PartitionedPairBuffer[K, C]。其中,PartitionedAppendOnlyMap就是一個SizeTrackingAppendOnlyMap。PartitionedPairBuffer則繼承了WritablePartitionedPairCollection,由于不需要按照 key 進行 combine,所以它的實現接近于一個 Array。
ExternalSorter.insertAll方法和之前看到的ExternalAppendOnlyMap方法是大差不差的,他也會對可以聚合的特征進行聚合,并且 TODO 上還說如果聚合之后的 reduction factor 不夠明顯,就停止聚合。
相比之前的 aggregator,ExternalSorter不僅能 aggregate,還能 sort。ExternalSorter在 Shuffle Read 和 Write 都有使用,而ExternalAppendOnlyMap只有在 Shuffle Read 中使用。所以為啥不直接搞一個ExternalSorter而是還要在前面墊一個ExternalAppendOnlyMap呢?為此,我們總結比較一下這兩者:
首先,在insertAll時,ExternalAppendOnlyMap是一定要做 combine 的,而ExternalSorter可以選擇是否做 combine,為此還有PartitionedAppendOnlyMap和PartitionedPairBuffer兩種數據結構。
其次,在做排序時,ExternalAppendOnlyMap默認對內存中的對象不進行排序,只有當要 Spill 的時候才會返回AppendOnlyMap.destructiveSortedIterator的方式將內存里面的東西有序寫入磁盤。在返回迭代器時,如果沒有發生 Spill,那么ExternalAppendOnlyMap返回沒有經過排序的currentMap,否則才通過ExternalIterator進行排序。而對ExternalSorter而言排序與否在于有沒有指定ordering。如果進行排序的話,那么它會首先考慮 Partition,再考慮 Key。
ExternalIterator
下面我們來看ExternalAppendOnlyMap中ExternalIterator的實現。它是一個典型的外部排序的實現,有一個 PQ 用來 merge。不過這次的迭代器換成了destructiveSortedIterator,也就是我們都是排序的了。這個道理也是顯而易見的,不 sort 一下,我們怎么和硬盤上的數據做聚合呢?
//?ExternalAppendOnlyMap.scala val?mergeHeap?=?new?mutable.PriorityQueue[StreamBuffer] val?sortedMap?=?destructiveIterator(currentMap.destructiveSortedIterator(keyComparator)) //?我們得到一個Array的迭代器 val?inputStreams?=?(Seq(sortedMap)?++?spilledMaps).map(it?=>?it.buffered)inputStreams.foreach?{?it?=>val?kcPairs?=?new?ArrayBuffer[(K,?C)]//?讀完所有具有所有相同hash(key)的序列,并創建一個StreamBuffer//?需要注意的是,由于哈希碰撞的原因,里面可能有多個keyreadNextHashCode(it,?kcPairs)if?(kcPairs.length?>?0)?{mergeHeap.enqueue(new?StreamBuffer(it,?kcPairs))} }我們先來看看destructiveSortedIterator的實現
//?AppendOnlyMap.scala def?destructiveSortedIterator(keyComparator:?Comparator[K]):?Iterator[(K,?V)]?=?{destroyed?=?truevar?keyIndex,?newIndex?=?0//?下面這個循環將哈希表里面散亂的KV對壓縮到最前面while?(keyIndex?<?capacity)?{if?(data(2?*?keyIndex)?!=?null)?{data(2?*?newIndex)?=?data(2?*?keyIndex)data(2?*?newIndex?+?1)?=?data(2?*?keyIndex?+?1)newIndex?+=?1}keyIndex?+=?1}assert(curSize?==?newIndex?+?(if?(haveNullValue)?1?else?0))new?Sorter(new?KVArraySortDataFormat[K,?AnyRef]).sort(data,?0,?newIndex,?keyComparator)//?這下面和前面實現大差不差,就省略了new?Iterator[(K,?V)]?{...} }下面我們來看看實現的next()接口函數,它是外部排序中的一個典型的歸并過程。我們需要注意的是minBuffer是一個StreamBuffer,維護一個hash(K), V的ArrayBuffer,類似H1 V1 H1 V2 H2 V3這樣的序列,而不是我們想的(K, V)流。因此其中是可能有哈希碰撞的。我們從mergeHeap中dequeue出來的StreamBuffer是當前hash(K)最小的所有K的集合。
override?def?next():?(K,?C)?=?{if?(mergeHeap.isEmpty)?{//?如果堆是空的,就再見了throw?new?NoSuchElementException}//?Select?a?key?from?the?StreamBuffer?that?holds?the?lowest?key?hash//?mergeHeap選擇所有StreamBuffer中最小hash的,作為minBufferval?minBuffer?=?mergeHeap.dequeue()//?minPairs是一個ArrayBuffer[T],表示這個StreamBuffer維護的所有KV對val?minPairs?=?minBuffer.pairsval?minHash?=?minBuffer.minKeyHash//?從一個ArrayBuffer[T]中移出Index為0的項目val?minPair?=?removeFromBuffer(minPairs,?0)//?得到非哈希的?(minKey,?minCombiner)val?minKey?=?minPair._1var?minCombiner?=?minPair._2assert(hashKey(minPair)?==?minHash)//?For?all?other?streams?that?may?have?this?key?(i.e.?have?the?same?minimum?key?hash),//?merge?in?the?corresponding?value?(if?any)?from?that?streamval?mergedBuffers?=?ArrayBuffer[StreamBuffer](minBuffer?"StreamBuffer")while?(mergeHeap.nonEmpty?&&?mergeHeap.head.minKeyHash?==?minHash)?{val?newBuffer?=?mergeHeap.dequeue()//?如果newBuffer的key和minKey相等的話(考慮哈希碰撞),就合并minCombiner?=?mergeIfKeyExists(minKey,?minCombiner,?newBuffer)mergedBuffers?+=?newBuffer}//?Repopulate?each?visited?stream?buffer?and?add?it?back?to?the?queue?if?it?is?non-emptymergedBuffers.foreach?{?buffer?=>if?(buffer.isEmpty)?{readNextHashCode(buffer.iterator,?buffer.pairs)}if?(!buffer.isEmpty)?{mergeHeap.enqueue(buffer)}}(minKey,?minCombiner) }SizeTracker
首先在每次集合更新之后,會調用afterUpdate,當到達采樣的 interval 之后,會takeSample。
//?SizeTracker.scala protected?def?afterUpdate():?Unit?=?{numUpdates?+=?1if?(nextSampleNum?==?numUpdates)?{takeSample()} }takeSample函數中第一句話就涉及多個對象,一個一個來看。
//?SizeTracker.scala private?def?takeSample():?Unit?=?{samples.enqueue(Sample(SizeEstimator.estimate(this),?numUpdates))...SizeEstimator.estimate的實現類似去做一個 state 隊列上的 BFS。
private?def?estimate(obj:?AnyRef,?visited:?IdentityHashMap[AnyRef,?AnyRef]):?Long?=?{val?state?=?new?SearchState(visited)state.enqueue(obj)while?(!state.isFinished)?{visitSingleObject(state.dequeue(),?state)}state.size }visitSingleObject來具體做這個 BFS,會特殊處理 Array 類型。我們不處理反射,因為反射包里面會引用到很多全局反射對象,這個對象又會應用到很多全局的大對象。同理,我們不處理 ClassLoader,因為它里面會應用到整個 REPL。反正 ClassLoaders 和 Classes 是所有對象共享的。
private?def?visitSingleObject(obj:?AnyRef,?state:?SearchState):?Unit?=?{val?cls?=?obj.getClassif?(cls.isArray)?{visitArray(obj,?cls,?state)}?else?if?(cls.getName.startsWith("scala.reflect"))?{}?else?if?(obj.isInstanceOf[ClassLoader]?||?obj.isInstanceOf[Class[_]])?{//?Hadoop?JobConfs?created?in?the?interpreter?have?a?ClassLoader.}?else?{obj?match?{case?s:?KnownSizeEstimation?=>state.size?+=?s.estimatedSizecase?_?=>val?classInfo?=?getClassInfo(cls)state.size?+=?alignSize(classInfo.shellSize)for?(field?<-?classInfo.pointerFields)?{state.enqueue(field.get(obj))}}} }然后我們創建一個Sample,并且放到隊列samples中
private?object?SizeTracker?{case?class?Sample(size:?Long,?numUpdates:?Long) }下面的主要工作就是計算一個bytesPerUpdate
??...//?Only?use?the?last?two?samples?to?extrapolate//?如果sample太多了,就刪除掉一些if?(samples.size?>?2)?{samples.dequeue()}val?bytesDelta?=?samples.toList.reverse?match?{case?latest?::?previous?::?tail?=>(latest.size?-?previous.size).toDouble?/?(latest.numUpdates?-?previous.numUpdates)//?If?fewer?than?2?samples,?assume?no?changecase?_?=>?0}bytesPerUpdate?=?math.max(0,?bytesDelta)nextSampleNum?=?math.ceil(numUpdates?*?SAMPLE_GROWTH_RATE).toLong }我們統計到上次估算之后經歷的 update 數量,并乘以bytesPerUpdate,即可得到總大小
//?SizeTracker.scala def?estimateSize():?Long?=?{assert(samples.nonEmpty)val?extrapolatedDelta?=?bytesPerUpdate?*?(numUpdates?-?samples.last.numUpdates)(samples.last.size?+?extrapolatedDelta).toLong }Shuffle Write 端源碼分析
Shuffle Write 端的實現主要依賴ShuffleManager中的ShuffleWriter對象,目前使用的ShuffleManager是SortShuffleManager,因此只討論它。它是一個抽象類,主要有SortShuffleWriter、UnsafeShuffleWriter、BypassMergeSortShuffleWriter等實現。
SortShuffleWriter
private[spark]?abstract?class?ShuffleWriter[K,?V]?{/**?Write?a?sequence?of?records?to?this?task's?output?*/@throws[IOException]def?write(records:?Iterator[Product2[K,?V]]):?Unit/**?Close?this?writer,?passing?along?whether?the?map?completed?*/def?stop(success:?Boolean):?Option[MapStatus] }SortShuffleWriter的實現可以說很簡單了,就是將records放到一個ExternalSorter里面,然后創建一個ShuffleMapOutputWriter。shuffleExecutorComponents實際上是一個LocalDiskShuffleExecutorComponents。ShuffleMapOutputWriter是一個 Java 接口,實際上被創建的是LocalDiskShuffleMapOutputWriter
//?SortShuffleWriter override?def?write(records:?Iterator[Product2[K,?V]]):?Unit?=?{sorter?=?if?(dep.mapSideCombine)?{new?ExternalSorter[K,?V,?C](context,?dep.aggregator,?Some(dep.partitioner),?dep.keyOrdering,?dep.serializer)}?else?{//?如果不需要進行mapSideCombine,那么我們傳入空的aggregator和ordering,//?我們在map端不負責對key進行排序,統統留給reduce端吧new?ExternalSorter[K,?V,?V](context,?aggregator?=?None,?Some(dep.partitioner),?ordering?=?None,?dep.serializer)}sorter.insertAll(records)//?Don't?bother?including?the?time?to?open?the?merged?output?file?in?the?shuffle?write?time,//?because?it?just?opens?a?single?file,?so?is?typically?too?fast?to?measure?accurately//?(see?SPARK-3570).val?mapOutputWriter?=?shuffleExecutorComponents.createMapOutputWriter(dep.shuffleId,?mapId,?dep.partitioner.numPartitions)...緊接著,調用ExternalSorter.writePartitionedMapOutput將自己維護的map或者buffer(根據是否有 Map Side Aggregation)寫到mapOutputWriter提供的partitionWriter里面。其過程用到了一個叫destructiveSortedWritablePartitionedIterator的迭代器,相比destructiveSortedIterator,它是多了 Writable 和 Partitioned 兩個詞。前者的意思是我可以寫到文件,后者的意思是我先按照 partitionId 排序,然后在按照給定的 Comparator 排序。
接著就是commitAllPartitions,這個函數調用writeIndexFileAndCommit。
//...sorter.writePartitionedMapOutput(dep.shuffleId,?mapId,?mapOutputWriter)val?partitionLengths?=?mapOutputWriter.commitAllPartitions()MapStatus被用來保存 Shuffle Write 操作的 metadata。
...mapStatus?=?MapStatus(blockManager.shuffleServerId,?partitionLengths,?mapId) }//?LocalDiskShuffleMapOutputWriter.java @Override public?long[]?commitAllPartitions()?throws?IOException?{...cleanUp();File?resolvedTmp?=?outputTempFile?!=?null?&&?outputTempFile.isFile()???outputTempFile?:?null;blockResolver.writeIndexFileAndCommit(shuffleId,?mapId,?partitionLengths,?resolvedTmp);return?partitionLengths; }writeIndexFileAndCommit負責為傳入的文件dataTmp創建一個索引文件,并原子地提交。注意到,到當前版本,每一個執行單元只會生成一份數據文件和一份索引。
//?IndexShuffleBlockResolver.java def?writeIndexFileAndCommit(shuffleId:?Int,?mapId:?Long,?lengths:?Array[Long],?dataTmp:?File):?Unit根據writeIndexFileAndCommit的注釋,getBlockData會來讀它寫的塊,這個getBlockData同樣位于我們先前介紹過的IndexShuffleBlockResolver類中。
Reference
https://docs.scala-lang.org/zh-cn/tour/implicit-parameters.html
https://zhuanlan.zhihu.com/p/354409+
https://fangjian0423.github.io/20+/+/20/scala-implicit/
https://www.cnblogs.com/xia520pi/p/8745923.html
https://spark.apache.org/docs/latest/api/scala/index.html
https://blog.csdn.net/bluishglc/article/details/52946575
https://stackoverflow.com/questions/4386+7/what-is-the-formal-difference-in-scala-between-braces-and-parentheses-and-when
https://intellipaat.com/blog/dataframes-rdds-apache-spark/
https://indatalabs.com/blog/convert-spark-rdd-to-dataframe-dataset
https://tech.meituan.com/20+/04/29/spark-tuning-basic.html
https://endymecy.gitbooks.io/spark-programming-guide-zh-cn/content/programming-guide/rdds/rdd-persistences.html
https://forums.databricks.com/questions/+792/no-reducebekey-api-in-dataset.html
https://stackoverflow.com/questions/38383207/rolling-your-own-reducebykey-in-spark-dataset
https://litaotao.github.io/boost-spark-application-performance
https://www.iteblog.com/archives/+72.html
https://vimsky.com/article/2708.html
https://scastie.scala-lang.org/
https://www.jianshu.com/p/5c230+fa360
https://www.cnblogs.com/nowgood/p/ScalaImplicitConversion.html
https://stackoverflow.com/questions/+8+352/why-we-need-implicit-parameters-in-scala
https://stackoverflow.com/questions/3+08083/difference-between-dataframe-dataset-and-rdd-in-spark/39033308
https://stackoverflow.com/questions/3+06009/spark-saveastextfile-last-partition-almost-never-finishes
https://stackoverflow.com/questions/43364432/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combineb
https://blog.csdn.net/dabokele/article/details/52802+0
https://blog.csdn.net/zrc+902+article/details/527+593
https://stackoverflow.com/questions/3094++/spark-nullpointerexception-when-rdd-isnt-collected-before-map
https://twitter.github.io/scala_school/zh_cn/advanced-types.html
https://colobu.com/20+/05/+/Variance-lower-bounds-upper-bounds-in-Scala/
https://www.cnblogs.com/fillPv/p/5392+6.html
https://issues.scala-lang.org/browse/SI-7005
https://stackoverflow.com/questions/32900862/map-can-not-be-serializable-in-scala
http://www.calvinneo.com/2019/08/06/scala-lang/
http://www.calvinneo.com/2019/05/16/pandas/
深入理解 SPARK:核心思想與源碼分析
http://www.calvinneo.com/2019/08/06/spark-sql/
https://zhuanlan.zhihu.com/p/67068559
http://www.jasongj.com/spark/rbo/
https://www.kancloud.cn/kancloud/spark-internals/45243
https://www.jianshu.com/p/4c5c2e535da5
http://jerryshao.me/20+/0+04/spark-shuffle-detail-investigation/
https://github.com/hustnn/TungstenSecret
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-shuffle-UnsafeShuffleWriter.html
https://blog.k2datascience.com/batch-processing-apache-spark-a670+008+7
https://stackoverflow.com/questions/45553492/spark-task-memory-allocation/45570944
https://0x0fff.com/spark-architecture-shuffle/
https://0x0fff.com/spark-memory-management/
https://www.slideshare.net/databricks/memory-management-in-apache-spark
https://www.linuxprobe.com/wp-content/uploads/20+/04/unified-memory-management-spark-+000.pdf
備注
騰訊互娛數據挖掘團隊招聘后臺開發實習生/正職,工作地點為深圳,有意者請發送簡歷到 jiaqiangwang [[AT]] tencent [[DOT]] com。
任職要求:
計算機相關專業本科及以上學歷,有扎實的計算機理論基礎;
熟悉 Python, C++, golang 等至少一種常用編程語言,有良好的代碼習慣和豐富的實踐經驗;
有熱情了解和嘗試新技術、架構,較強的學習能力和邏輯思維能力;
較強的溝通能力,能夠邏輯清晰地進行自我表達,團隊合作意識強,與人溝通積極主動;
加分項:
對機器學習算法有較深的了解;
有分布式計算底層性能優化經驗;
對機器學習有基本的了解,了解常見機器學習算法基本原理;
有基于分布式計算框架開發經驗和大規模數據處理經驗;
總結
以上是生活随笔為你收集整理的Spark源码和调优简介 Spark Core的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 从STGW流量下降探秘内核收包机制
- 下一篇: 高性能编程:三级缓存(LLC)访问优化