日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大数据计算平台Spark内核全面解读

發(fā)布時間:2025/3/8 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据计算平台Spark内核全面解读 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1Spark介紹

Spark是起源于美國加州大學(xué)伯克利分校AMPLab的大數(shù)據(jù)計算平臺,在2010年開源,目前是Apache軟件基金會的頂級項目。隨著Spark在大數(shù)據(jù)計算領(lǐng)域的暫露頭角,越來越多的企業(yè)開始關(guān)注和使用。201411月,SparkDaytona Gray Sort 100TB Benchmark競賽中打破了由Hadoop MapReduce保持的排序記錄。Spark利用1/10的節(jié)點(diǎn)數(shù),100TB數(shù)據(jù)的排序時間從72分鐘提高到了23分鐘

Spark在架構(gòu)上包括內(nèi)核部分和4個官方子模塊--Spark SQLSpark Streaming、機(jī)器學(xué)習(xí)庫MLlib和圖計算庫GraphX。圖1所示為Spark在伯克利的數(shù)據(jù)分析軟件棧BDASBerkeley Data Analytics Stack)中的位置。可見Spark專注于數(shù)據(jù)的計算,而數(shù)據(jù)的存儲在生產(chǎn)環(huán)境中往往還是由Hadoop分布式文件系統(tǒng)HDFS承擔(dān)。

1 SparkBDAS中的位置?

Spark被設(shè)計成支持多場景的通用大數(shù)據(jù)計算平臺,它可以解決大數(shù)據(jù)計算中的批處理,交互查詢及流式計算等核心問題。Spark可以從多數(shù)據(jù)源的讀取數(shù)據(jù),并且擁有不斷發(fā)展的機(jī)器學(xué)習(xí)庫和圖計算庫供開發(fā)者使用。數(shù)據(jù)和計算在Spark內(nèi)核及Spark的子模塊中是打通的,這就意味著Spark內(nèi)核和子模塊之間成為一個整體。Spark的各個子模塊以Spark內(nèi)核為基礎(chǔ),進(jìn)一步支持更多的計算場景,例如使用Spark SQL讀入的數(shù)據(jù)可以作為機(jī)器學(xué)習(xí)庫MLlib的輸入。表1列舉了一些在Spark平臺上的計算場景。

1 Spark的應(yīng)用場景舉例

在本文寫作是,Spark的最新版本為1.2.0,文中的示例代碼也來自于這個版本。

2Spark內(nèi)核介紹?

相信大數(shù)據(jù)工程師都非常了解Hadoop MapReduce一個最大的問題是在很多應(yīng)用場景中速度非常慢,只適合離線的計算任務(wù)。這是由于MapReduce需要將任務(wù)劃分成mapreduce兩個階段,map階段產(chǎn)生的中間結(jié)果要寫回磁盤,而在這兩個階段之間需要進(jìn)行shuffle操作。Shuffle操作需要從網(wǎng)絡(luò)中的各個節(jié)點(diǎn)進(jìn)行數(shù)據(jù)拷貝,使其往往成為最為耗時的步驟,這也是Hadoop MapReduce慢的根本原因之一,大量的時間耗費(fèi)在網(wǎng)絡(luò)磁盤IO中而不是用于計算。在一些特定的計算場景中,例如像邏輯回歸這樣的迭代式的計算,MapReduce的弊端會顯得更加明顯。

Spark是如果設(shè)計分布式計算的呢?首先我們需要理解Spark中最重要的概念--彈性分布數(shù)據(jù)集(Resilient Distributed Dataset),也就是RDD。?

2.1 彈性分布數(shù)據(jù)集RDD

RDDSpark中對數(shù)據(jù)和計算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可變的并能夠被并行操作的數(shù)據(jù)集合。對RDD的操作分為兩種transformationactionTransformation操作是通過轉(zhuǎn)換從一個或多個RDD生成新的RDDAction操作是從RDD生成最后的計算結(jié)果。在Spark最新的版本中,提供豐富的transformationaction操作,比起MapReduce計算模型中僅有的兩種操作,會大大簡化程序開發(fā)的難度。

RDD的生成方式只有兩種,一是從數(shù)據(jù)源讀入,另一種就是從其它RDD通過transformation操作轉(zhuǎn)換。一個典型的Spark程序就是通過Spark上下文環(huán)境(SparkContext)生成一個或多個RDD,在這些RDD上通過一系列的transformation操作生成最終的RDD,最后通過調(diào)用最終RDDaction方法輸出結(jié)果。

每個RDD都可以用下面5個特性來表示,其中后兩個為可選的:

  • 分片列表(數(shù)據(jù)塊列表)

  • 計算每個分片的函數(shù)

  • 對父RDD的依賴列表

  • key-value類型的RDD的分片器(Partitioner)(可選)

  • 每個數(shù)據(jù)分片的預(yù)定義地址列表(如HDFS上的數(shù)據(jù)塊的地址)(可選)

雖然Spark是基于內(nèi)存的計算,但RDD不光可以存儲在內(nèi)存中,根據(jù)useDiskuseMemoryuseOffHeap, deserializedreplication五個參數(shù)的組合Spark提供了12種存儲級別,在后面介紹RDD的容錯機(jī)制時,我們會進(jìn)一步理解。值得注意的是當(dāng)StorageLevel設(shè)置成OFF_HEAP時,RDD實(shí)際被保存到Tachyon中。Tachyon是一個基于內(nèi)存的分布式文件系統(tǒng),目前正在快速發(fā)展,本文不做詳細(xì)介紹,可以通過其官方網(wǎng)站進(jìn)一步了解。

  • class StorageLevel private(

  • ??? private var _useDisk: Boolean,

  • ??? private var _useMemory: Boolean,

  • ??? private var _useOffHeap: Boolean,

  • ??? private var _deserialized: Boolean

  • ??? private var _replication: Int = 1)

  • ? extends Externalizable { //… }

  • ?

  • val NONE = new StorageLevel(false, false, false, false)

  • ? val DISK_ONLY = new StorageLevel(true, false, false, false)

  • ? val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

  • ? val MEMORY_ONLY = new StorageLevel(false, true, false, true)

  • ? val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

  • ? val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

  • ? val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

  • ? val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

  • ? val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

  • ? val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

  • ? val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

  • ? val OFF_HEAP = new StorageLevel(false, false, true, false)

  • 2.2 DAGStage與任務(wù)的生成

    Spark的計算發(fā)生在RDDaction操作,而對action之前的所有transformationSpark只是記錄下RDD生成的軌跡,而不會觸發(fā)真正的計算。

    Spark內(nèi)核會在需要計算發(fā)生的時刻繪制一張關(guān)于計算路徑的有向無環(huán)圖,也就是DAG。舉個例子,在圖2中,從輸入中邏輯上生成AC兩個RDD,經(jīng)過一系列transformation操作,邏輯上生成了F,注意,我們說的是邏輯上,因?yàn)檫@時候計算沒有發(fā)生,Spark內(nèi)核做的事情只是記錄了RDD的生成和依賴關(guān)系。當(dāng)F要進(jìn)行輸出時,也就是F進(jìn)行了action操作,Spark會根據(jù)RDD的依賴生成DAG,并從起點(diǎn)開始真正的計算。

    2 邏輯上的計算過程:DAG?

    有了計算的DAG圖,Spark內(nèi)核下一步的任務(wù)就是根據(jù)DAG圖將計算劃分成任務(wù)集,也就是Stage,這樣可以將任務(wù)提交到計算節(jié)點(diǎn)進(jìn)行真正的計算。Spark計算的中間結(jié)果默認(rèn)是保存在內(nèi)存中的,Spark在劃分Stage的時候會充分考慮在分布式計算中可流水線計算(pipeline)的部分來提高計算的效率,而在這個過程中,主要的根據(jù)就是RDD的依賴類型。根據(jù)不同的transformation操作,RDD的依賴可以分為窄依賴(Narrow Dependency)和寬依賴(Wide Dependency,在代碼中為ShuffleDependency)兩種類型。窄依賴指的是生成的RDD中每個partition只依賴于父RDD(s) 固定的partition。寬依賴指的是生成的RDD的每一個partition都依賴于父 RDD(s) 所有partition。窄依賴典型的操作有map, filter, union等,寬依賴典型的操作有groupByKey, sortByKey等。可以看到,寬依賴往往意味著shuffle操作,這也是Spark劃分stage的主要邊界。對于窄依賴,Spark會將其盡量劃分在同一個stage中,因?yàn)樗鼈兛梢赃M(jìn)行流水線計算。

    3 RDD的寬依賴和窄依賴

    我們再通過圖4詳細(xì)解釋一下Spark中的Stage劃分。我們從HDFS中讀入數(shù)據(jù)生成3個不同的RDD,通過一系列transformation操作后再將計算結(jié)果保存回HDFS。可以看到這幅DAG中只有join操作是一個寬依賴,Spark內(nèi)核會以此為邊界將其前后劃分成不同的Stage. 同時我們可以注意到,在圖中Stage2中,從mapunion都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結(jié)束,而是繼續(xù)進(jìn)行union操作,這樣大大提高了計算的效率。

    4 Spark中的Stage劃分?

    Spark在運(yùn)行時會把Stage包裝成任務(wù)提交,有父StageSpark會先提交父Stage。弄清楚了Spark劃分計算的原理,我們再結(jié)合源碼看一看這其中的過程。下面的代碼是DAGScheduler中的得到一個RDDStage的函數(shù),可以看到寬依賴為劃分Stage的邊界。

  • /**

  • ?? * Get or create the list of parent stages for a given RDD. The stages will be assigned the

  • ?? * provided jobId if they haven't already been created with a lower jobId.

  • ?? */

  • ?

  • ? private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {

  • ??? val parents = new HashSet[Stage]

  • ??? val visited = new HashSet[RDD[_]]

  • ??? // We are manually maintaining a stack here to prevent StackOverflowError

  • ??? // caused by recursively visiting

  • ??? val waitingForVisit = new Stack[RDD[_]]

  • ??? def visit(r: RDD[_]) {

  • ????? if (!visited(r)) {

  • ??????? visited += r

  • ??????? // Kind of ugly: need to register RDDs with the cache here since

  • ??????? // we can't do it in its constructor because # of partitions is unknown

  • ??????? for (dep <- r.dependencies) {

  • ??????? ??dep match {

  • ??????????? case shufDep: ShuffleDependency[_, _, _] =>

  • ????????????? parents += getShuffleMapStage(shufDep, jobId)

  • ??????????? case _ =>

  • ????????????? waitingForVisit.push(dep.rdd)

  • ????????? }

  • ??????? }

  • ????? }

  • ??? }

  • ?

  • ??? waitingForVisit.push(rdd)

  • ??? while (!waitingForVisit.isEmpty) {

  • ????? visit(waitingForVisit.pop())

  • ??? }

  • ??? parents.toList

  • ? }

  • 上面提到Spark的計算是從RDD調(diào)用action操作時候觸發(fā)的,我們來看一個action的代碼

    RDDcollect方法是一個action操作,作用是將RDD中的數(shù)據(jù)返回到一個數(shù)組中。可以看到,在此action中,會觸發(fā)Spark上下文環(huán)境SparkContext中的runJob方法,這是一系列計算的起點(diǎn)。

  • abstract class RDD[T: ClassTag](

  • ??? @transient private var sc: SparkContext,

  • ??? @transient private var deps: Seq[Dependency[_]]

  • ? ) extends Serializable with Logging {

  • ? //….

  • /**

  • ?? * Return an array that contains all of the elements in this RDD.

  • ?? */

  • ? def collect(): Array[T] = {

  • ??? val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

  • ??? Array.concat(results: _*)

  • ? }

  • }

  • SparkContext擁有DAGScheduler的實(shí)例,在runJob方法中會進(jìn)一步調(diào)用DAGSchedulerrunJob方法。在此時,DAGScheduler會生成DAGStage,將Stage提交給TaskSchedulerTaskSchdulerStage包裝成TaskSet,發(fā)送到Worker節(jié)點(diǎn)進(jìn)行真正的計算,同時還要監(jiān)測任務(wù)狀態(tài),重試失敗和長時間無返回的任務(wù)。整個過程如圖5所示。

    ?

    5 Spark中任務(wù)的生成?

    2.3 RDD的緩存與容錯

    上文提到,Spark的計算是從action開始觸發(fā)的,如果在action操作之前邏輯上很多transformation操作,一旦中間發(fā)生計算失敗,Spark會重新提交任務(wù),這在很多場景中代價過大。還有一些場景,如有些迭代算法,計算的中間結(jié)果會被重復(fù)使用,重復(fù)計算同樣增加計算時間和造成資源浪費(fèi)。因此,在提高計算效率和更好支持容錯,Spark提供了基于RDDcache機(jī)制和checkpoint機(jī)制。

    我們可以通過RDDtoDebugString來查看其遞歸的依賴信息,圖6展示了在spark shell中通過調(diào)用這個函數(shù)來查看wordCount RDD的依賴關(guān)系,也就是它的Lineage.

    6 RDD wordCountlineage?

    如果發(fā)現(xiàn)Lineage過長或者里面有被多次重復(fù)使用的RDD,我們就可以考慮使用cache機(jī)制或checkpoint機(jī)制了。

    我們可以通過在程序中直接調(diào)用RDDcache方法將其保存在內(nèi)存中,這樣這個RDD就可以被多個任務(wù)共享,避免重復(fù)計算。另外,RDD還提供了更為靈活的persist方法,可以指定存儲級別。從源碼中可以看到RDD.cache就是簡單的調(diào)用了RDD.persist(StorageLevel.MEMORY_ONLY)

  • /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

  • ? def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  • ? def cache(): this.type = persist()

  • 同樣,我們可以調(diào)用RDDcheckpoint方法將其保存到磁盤。我們需要在SparkContext中設(shè)置checkpoint的目錄,否則調(diào)用會拋出異常。值得注意的是,在調(diào)用checkpoint之前建議先調(diào)用cache方法將RDD放入內(nèi)存,否則將RDD保存到文件的時候需要重新計算。?

  • ? /**

  • ?? * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint

  • ?? * directory set with SparkContext.setCheckpointDir() and all references to its parent

  • ?? * RDDs will be removed. This function must be called before any job has been

  • ?? * executed on this RDD. It is strongly recommended that this RDD is persisted in

  • ?? * memory, otherwise saving it on a file will require recomputation.

  • ?? */

  • ? def checkpoint() {

  • ??? if (context.checkpointDir.isEmpty) {

  • ????? throw new SparkException("Checkpoint directory has not been set in the SparkContext")

  • ??? } else if (checkpointData.isEmpty) {

  • ????? checkpointData = Some(new RDDCheckpointData(this))

  • ????? checkpointData.get.markForCheckpoint()

  • ??? }

  • ? }

  • Cache機(jī)制和checkpoint機(jī)制的差別在于cacheRDD保存到內(nèi)存,并保留Lineage,如果緩存失效RDD還可以通過Lineage重建。而checkpointRDD落地到磁盤并切斷Lineage,由文件系統(tǒng)保證其重建。

    2.4 Spark任務(wù)的部署

    Spark的集群部署分為StandaloneMesosYarn三種模式,我們以Standalone模式為例,簡單介紹Spark程序的部署。如圖7示,集群中的Spark程序運(yùn)行時分為3種角色,driver, masterworkerslave)。在集群啟動前,首先要配置masterworker節(jié)點(diǎn)。啟動集群后,worker節(jié)點(diǎn)會向master節(jié)點(diǎn)注冊自己,master節(jié)點(diǎn)會維護(hù)worker節(jié)點(diǎn)的心跳。Spark程序都需要先創(chuàng)建Spark上下文環(huán)境,也就是SparkContext。創(chuàng)建SparkContext的進(jìn)程就成為了driver角色,上一節(jié)提到的DAGSchedulerTaskScheduler都在driver中運(yùn)行。Spark程序在提交時要指定master的地址,這樣可以在程序啟動時向master申請worker的計算資源。Drivermasterworker之間的通信由Akka支持。Akka 也使用 Scala 編寫,用于構(gòu)建可容錯的、高可伸縮性的Actor 模型應(yīng)用。關(guān)于Akka,可以訪問其官方網(wǎng)站進(jìn)行進(jìn)一步了解,本文不做詳細(xì)介紹。

    7 Spark任務(wù)部署

    3、更深一步了解Spark內(nèi)核

    了解了Spark內(nèi)核的基本概念和實(shí)現(xiàn)后,更深一步理解其工作原理的最好方法就是閱讀源碼。最新的Spark源碼可以從Spark官方網(wǎng)站下載。源碼推薦使用IntelliJ IDEA閱讀,會自動安裝Scala插件。讀者可以從core工程,也就是Spark內(nèi)核工程開始閱讀,更可以設(shè)置斷點(diǎn)嘗試跟蹤一個任務(wù)的執(zhí)行。另外,讀者還可以通過分析Spark的日志來進(jìn)一步理解Spark的運(yùn)行機(jī)制,Spark使用log4j記錄日志,可以在啟動集群前修改log4j的配置文件來配置日志輸出和格式。

    【編輯推薦】

  • Spark:利用Eclipse構(gòu)建Spark集成開發(fā)環(huán)境

  • Spark實(shí)戰(zhàn):單節(jié)點(diǎn)本地模式搭建Spark運(yùn)行環(huán)境

  • Spark:為大數(shù)據(jù)處理點(diǎn)亮一盞明燈

  • 專訪Spark亞太研究院王家林:從技術(shù)的角度探索Spark

  • StormSpark:誰才是我們的實(shí)時處理利器


  • 總結(jié)

    以上是生活随笔為你收集整理的大数据计算平台Spark内核全面解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。