日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Spark原理小结

發(fā)布時(shí)間:2024/1/23 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark原理小结 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1、spark是什么?

  快速,通用,可擴(kuò)展的分布式計(jì)算引擎

2、彈性分布式數(shù)據(jù)集RDD

  RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度。

RDD的屬性

?

1)一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

2)一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。

3)RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

4)一個(gè)Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

5)一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。

創(chuàng)建RDD的兩種方式

  1、由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建。

    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

  2、由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有Hadoop支持的數(shù)據(jù)集,比如HDFS、Cassandra、HBase等

    val rdd2 = sc.textFile("hdfs://node1.itcast.cn:9000/words.txt")

3、Spark的算子

  RDD中的所有轉(zhuǎn)換都是延遲加載的,也就是說(shuō),它們并不會(huì)直接計(jì)算結(jié)果。相反的,它們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集(例如一個(gè)文件)上的轉(zhuǎn)換動(dòng)作。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的動(dòng)作時(shí),這些轉(zhuǎn)換才會(huì)真正運(yùn)行。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。

  1、Transformation

  2、Action

4、RDD的依賴關(guān)系

  RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

?  

RDD緩存

  Spark速度非常快的原因之一,就是在不同操作中可以在內(nèi)存中持久化或緩存?zhèn)€數(shù)據(jù)集。當(dāng)持久化某個(gè)RDD后,每一個(gè)節(jié)點(diǎn)都將把計(jì)算的分片結(jié)果保存在內(nèi)存中

RDD緩存的方式

  RDD通過(guò)persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會(huì)被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。

  cache最終也是調(diào)用了persist方法,默認(rèn)的存儲(chǔ)級(jí)別都是僅在內(nèi)存存儲(chǔ)一份

Spark的存儲(chǔ)級(jí)別

?

緩存有可能丟失,或者存儲(chǔ)存儲(chǔ)于內(nèi)存的數(shù)據(jù)由于內(nèi)存不足而被刪除,RDD的緩存容錯(cuò)機(jī)制保證了即使緩存丟失也能保證計(jì)算的正確執(zhí)行。通過(guò)基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會(huì)被重算,由于RDD的各個(gè)Partition是相對(duì)獨(dú)立的,因此只需要計(jì)算丟失的部分即可,并不需要重算全部Partition。

5、DAG的生成

DAG(Directed Acyclic Graph)叫做有向無(wú)環(huán)圖,原始的RDD通過(guò)一系列的轉(zhuǎn)換就就形成了DAG,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage,對(duì)于窄依賴,partition的轉(zhuǎn)換處理在Stage中完成計(jì)算。對(duì)于寬依賴,由于有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來(lái)的計(jì)算,因此寬依賴是劃分Stage的依據(jù)。

?

spark運(yùn)行原理?

  1、通過(guò)ActorSystem創(chuàng)建MasterActor,啟動(dòng)定時(shí)器,定時(shí)檢查與接收Worker節(jié)點(diǎn)的發(fā)送消息

  2、Worker節(jié)點(diǎn)主動(dòng)向Master發(fā)送注冊(cè)消息

  3、Master接收Worker的注冊(cè)請(qǐng)求,然后將注冊(cè)信息保存起來(lái),并向Worker返回一個(gè)注冊(cè)成功的消息

  4、Worker接收到Master注冊(cè)成功的消息后,啟用定時(shí)器,定時(shí)向master發(fā)送心跳報(bào)活,Master接收到Worker發(fā)送來(lái)的心跳消息后,更新Worker上一次的心跳時(shí)間

  5、DAGScheduler根據(jù)FinalRDD遞歸向上解析Lineager的依賴關(guān)系,并以寬依賴為切分一個(gè)新stage的依據(jù),并將多個(gè)task任務(wù)封裝到TaskSet,其中Task的數(shù)量由其父RDD的切片數(shù)量決定,最后使用遞歸優(yōu)先提交父Stage(TaskSet)

  6、先創(chuàng)建TaskScheduler即TaskSchedulerImpl接著又創(chuàng)建SparkDeploySchedulerBackend對(duì)資源參數(shù)創(chuàng)建AppClient與Master注冊(cè)Application,并替每個(gè)TaskSet創(chuàng)建TaskManager負(fù)責(zé)監(jiān)控此TaskSet中任務(wù)的執(zhí)行情況

  7、Master接收到ClientActor的任務(wù)描述之后,將任務(wù)描述信息保存起來(lái),然后ClientActor返回消息,告知ClientActor任務(wù)注冊(cè)成功,接下來(lái)Master(打散|負(fù)載均衡|盡量集中)進(jìn)行資源調(diào)度

  8、Master跟Worker通信,然后讓W(xué)orker啟動(dòng)Executor

  9、Executor向Driver發(fā)送注冊(cè)消息,Driver接收到Executor注冊(cè)消息后,響應(yīng)注冊(cè)成功的消息

  10、Executor接收到Driver注冊(cè)成功的消息后,本進(jìn)程中創(chuàng)建Executor的引用對(duì)象

  11、Driver中TaskSchedulerImp向Executor發(fā)送LaunchTask消息,Executor將創(chuàng)建一個(gè)線程池作為所提交的Task任務(wù)的容器

  12、Task接收到launchTask消息后,準(zhǔn)備運(yùn)行文件初始化與反序列化,就緒后,調(diào)用Task的run方法,其中每個(gè)Task所執(zhí)行的函數(shù)是應(yīng)用在RDD中的一個(gè)獨(dú)立分區(qū)上

  13、Task運(yùn)行完成,向TaskManager匯報(bào)情況,并且釋放線程資源

  14、所有Task運(yùn)行結(jié)束之后,Executor向Worker注銷自身,釋放資源。

Spark Standalone

Spark Standalone模式中,資源調(diào)度室Spark自行實(shí)現(xiàn)的,其節(jié)點(diǎn)類型分為master和worker,

其中Driver運(yùn)行在Master中,并且有長(zhǎng)駐內(nèi)存的Master進(jìn)程守護(hù),Worker節(jié)點(diǎn)上常駐Worker守護(hù)進(jìn)程,負(fù)責(zé)與Master節(jié)點(diǎn)通信,通過(guò)ExecutorRunner來(lái)控制運(yùn)行在當(dāng)前節(jié)點(diǎn)上的CoarseGrainedExecutorBackend,

每個(gè)Worker上存在一個(gè)或多個(gè)CoarseGrainedExecutorBackend進(jìn)程,每個(gè)進(jìn)程包含一個(gè)Executor對(duì)象,該對(duì)象持有一個(gè)線程池,每個(gè)線程池可以執(zhí)行一個(gè)Task

?

節(jié)點(diǎn)類型:

1. master?節(jié)點(diǎn): 常駐master進(jìn)程,負(fù)責(zé)管理全部worker節(jié)點(diǎn)。

2. worker?節(jié)點(diǎn): 常駐worker進(jìn)程,負(fù)責(zé)管理executor?并與master節(jié)點(diǎn)通信。

dirvier:官方解釋為:?The process running the main() function of the application and creating the SparkContext。即理解為用戶自己編寫的應(yīng)用程序

Executor:執(zhí)行器:

  在每個(gè)WorkerNode上為某應(yīng)用啟動(dòng)的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行任務(wù),并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上,每個(gè)任務(wù)都有各自獨(dú)立的Executor。

  Executor是一個(gè)執(zhí)行Task的容器。它的主要職責(zé)是:

  1、初始化程序要執(zhí)行的上下文SparkEnv,解決應(yīng)用程序需要運(yùn)行時(shí)的jar包的依賴,加載類。

  2、同時(shí)還有一個(gè)ExecutorBackend向cluster manager匯報(bào)當(dāng)前的任務(wù)狀態(tài),這一方面有點(diǎn)類似hadoop的tasktracker和task。

總結(jié):Executor是一個(gè)應(yīng)用程序運(yùn)行的監(jiān)控和執(zhí)行容器。Executor的數(shù)目可以在submit時(shí),由?--num-executors (on yarn)指定.

Job:?

  包含很多task的并行計(jì)算,可以認(rèn)為是Spark RDD?里面的action,每個(gè)action的計(jì)算會(huì)生成一個(gè)job。

用戶提交的Job會(huì)提交給DAGScheduler,Job會(huì)被分解成Stage和Task。

Stage:

  一個(gè)Job會(huì)被拆分為多組Task,每組任務(wù)被稱為一個(gè)Stage就像Map Stage,?Reduce Stage。

  Stage的劃分在RDD的論文中有詳細(xì)的介紹,簡(jiǎn)單的說(shuō)是以shuffle和result這兩種類型來(lái)劃分。在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask,第一類task的輸出是shuffle所需數(shù)據(jù),第二類task的輸出是result,stage的劃分也以此為依據(jù),shuffle之前的所有變換是一個(gè)stage,shuffle之后的操作是另一個(gè)stage。比如?rdd.parallize(1 to 10).foreach(println)?這個(gè)操作沒有shuffle,直接就輸出了,那么只有它的task是resultTask,stage也只有一個(gè);如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println),?這個(gè)job因?yàn)橛衦educe,所以有一個(gè)shuffle過(guò)程,那么reduceByKey之前的是一個(gè)stage,執(zhí)行shuffleMapTask,輸出shuffle所需的數(shù)據(jù),reduceByKey到最后是一個(gè)stage,直接就輸出結(jié)果了。如果job中有多次shuffle,那么每個(gè)shuffle之前都是一個(gè)stage。

Task

  即?stage?下的一個(gè)任務(wù)執(zhí)行單元,一般來(lái)說(shuō),一個(gè)?rdd?有多少個(gè)?partition,就會(huì)有多少個(gè)?task,因?yàn)槊恳粋€(gè)?task?只是處理一個(gè)?partition?上的數(shù)據(jù).

  每個(gè)executor執(zhí)行的task的數(shù)目, 可以由submit時(shí),--num-executors(on yarn)?來(lái)指定。

小結(jié):

  驅(qū)動(dòng)程序就是執(zhí)行了一個(gè)Spark Application的main函數(shù)和創(chuàng)建Spark Context的進(jìn)程,它包含了這個(gè)application的全部代碼。

  Spark Application中的每個(gè)action會(huì)被Spark作為Job進(jìn)行調(diào)度。

  每個(gè)Job是一個(gè)計(jì)算序列的最終結(jié)果,而這個(gè)序列中能夠產(chǎn)生中間結(jié)果的計(jì)算就是一個(gè)stage。

  對(duì)于Transformations和Actions是有著明確區(qū)分的。通常Action對(duì)應(yīng)了Job,而Transformation對(duì)應(yīng)了Stage

  一個(gè)Job被拆分成若干個(gè)Stage,每個(gè)Stage執(zhí)行一些計(jì)算,產(chǎn)生一些中間結(jié)果。它們的目的是最終生成這個(gè)Job的計(jì)算結(jié)果。

  而每個(gè)Stage是一個(gè)task set,包含若干個(gè)task。Task是Spark中最小的工作單元,在一個(gè)executor上完成一個(gè)特定的事情。

??1、driver program是用戶寫的帶main函數(shù)的代碼

??2、每個(gè)action算子的操作都會(huì)對(duì)應(yīng)一個(gè)job,例如(ForeachRDD寫入外部系統(tǒng)的一個(gè)操作)

??3、DAGScheduler會(huì)對(duì)Job進(jìn)行拆分,拆分的依據(jù):根據(jù)FinalRDD(在這里ForeachRDD)遞歸向上解析Lineager的依賴關(guān)系,以寬依賴為切分stage的依據(jù),切分成若干個(gè)Stage,遞歸優(yōu)先提交父Stage,每個(gè)Stage里面包含多個(gè)Task任務(wù)

??4、若干個(gè)Transformation的算子RDD組成Stage,所以一個(gè)RDD中有多少個(gè)partition,就有多少個(gè)Task,因?yàn)槊恳粋€(gè)Task只對(duì)一個(gè)partition數(shù)據(jù)做處理。

總結(jié)

以上是生活随笔為你收集整理的Spark原理小结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。