Spark原理小结
1、spark是什么?
快速,通用,可擴(kuò)展的分布式計(jì)算引擎
2、彈性分布式數(shù)據(jù)集RDD
RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度。
RDD的屬性
?
1)一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。
2)一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。
3)RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。
4)一個(gè)Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。
5)一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。
創(chuàng)建RDD的兩種方式
1、由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
2、由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有Hadoop支持的數(shù)據(jù)集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile("hdfs://node1.itcast.cn:9000/words.txt")
3、Spark的算子
RDD中的所有轉(zhuǎn)換都是延遲加載的,也就是說(shuō),它們并不會(huì)直接計(jì)算結(jié)果。相反的,它們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集(例如一個(gè)文件)上的轉(zhuǎn)換動(dòng)作。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的動(dòng)作時(shí),這些轉(zhuǎn)換才會(huì)真正運(yùn)行。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。
1、Transformation
2、Action
4、RDD的依賴關(guān)系
RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
?
RDD緩存
Spark速度非常快的原因之一,就是在不同操作中可以在內(nèi)存中持久化或緩存?zhèn)€數(shù)據(jù)集。當(dāng)持久化某個(gè)RDD后,每一個(gè)節(jié)點(diǎn)都將把計(jì)算的分片結(jié)果保存在內(nèi)存中
RDD緩存的方式
RDD通過(guò)persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會(huì)被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。
cache最終也是調(diào)用了persist方法,默認(rèn)的存儲(chǔ)級(jí)別都是僅在內(nèi)存存儲(chǔ)一份
Spark的存儲(chǔ)級(jí)別
?
緩存有可能丟失,或者存儲(chǔ)存儲(chǔ)于內(nèi)存的數(shù)據(jù)由于內(nèi)存不足而被刪除,RDD的緩存容錯(cuò)機(jī)制保證了即使緩存丟失也能保證計(jì)算的正確執(zhí)行。通過(guò)基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會(huì)被重算,由于RDD的各個(gè)Partition是相對(duì)獨(dú)立的,因此只需要計(jì)算丟失的部分即可,并不需要重算全部Partition。
5、DAG的生成
DAG(Directed Acyclic Graph)叫做有向無(wú)環(huán)圖,原始的RDD通過(guò)一系列的轉(zhuǎn)換就就形成了DAG,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage,對(duì)于窄依賴,partition的轉(zhuǎn)換處理在Stage中完成計(jì)算。對(duì)于寬依賴,由于有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來(lái)的計(jì)算,因此寬依賴是劃分Stage的依據(jù)。
?
spark運(yùn)行原理?
1、通過(guò)ActorSystem創(chuàng)建MasterActor,啟動(dòng)定時(shí)器,定時(shí)檢查與接收Worker節(jié)點(diǎn)的發(fā)送消息
2、Worker節(jié)點(diǎn)主動(dòng)向Master發(fā)送注冊(cè)消息
3、Master接收Worker的注冊(cè)請(qǐng)求,然后將注冊(cè)信息保存起來(lái),并向Worker返回一個(gè)注冊(cè)成功的消息
4、Worker接收到Master注冊(cè)成功的消息后,啟用定時(shí)器,定時(shí)向master發(fā)送心跳報(bào)活,Master接收到Worker發(fā)送來(lái)的心跳消息后,更新Worker上一次的心跳時(shí)間
5、DAGScheduler根據(jù)FinalRDD遞歸向上解析Lineager的依賴關(guān)系,并以寬依賴為切分一個(gè)新stage的依據(jù),并將多個(gè)task任務(wù)封裝到TaskSet,其中Task的數(shù)量由其父RDD的切片數(shù)量決定,最后使用遞歸優(yōu)先提交父Stage(TaskSet)
6、先創(chuàng)建TaskScheduler即TaskSchedulerImpl接著又創(chuàng)建SparkDeploySchedulerBackend對(duì)資源參數(shù)創(chuàng)建AppClient與Master注冊(cè)Application,并替每個(gè)TaskSet創(chuàng)建TaskManager負(fù)責(zé)監(jiān)控此TaskSet中任務(wù)的執(zhí)行情況
7、Master接收到ClientActor的任務(wù)描述之后,將任務(wù)描述信息保存起來(lái),然后ClientActor返回消息,告知ClientActor任務(wù)注冊(cè)成功,接下來(lái)Master(打散|負(fù)載均衡|盡量集中)進(jìn)行資源調(diào)度
8、Master跟Worker通信,然后讓W(xué)orker啟動(dòng)Executor
9、Executor向Driver發(fā)送注冊(cè)消息,Driver接收到Executor注冊(cè)消息后,響應(yīng)注冊(cè)成功的消息
10、Executor接收到Driver注冊(cè)成功的消息后,本進(jìn)程中創(chuàng)建Executor的引用對(duì)象
11、Driver中TaskSchedulerImp向Executor發(fā)送LaunchTask消息,Executor將創(chuàng)建一個(gè)線程池作為所提交的Task任務(wù)的容器
12、Task接收到launchTask消息后,準(zhǔn)備運(yùn)行文件初始化與反序列化,就緒后,調(diào)用Task的run方法,其中每個(gè)Task所執(zhí)行的函數(shù)是應(yīng)用在RDD中的一個(gè)獨(dú)立分區(qū)上
13、Task運(yùn)行完成,向TaskManager匯報(bào)情況,并且釋放線程資源
14、所有Task運(yùn)行結(jié)束之后,Executor向Worker注銷自身,釋放資源。
Spark Standalone
Spark Standalone模式中,資源調(diào)度室Spark自行實(shí)現(xiàn)的,其節(jié)點(diǎn)類型分為master和worker,
其中Driver運(yùn)行在Master中,并且有長(zhǎng)駐內(nèi)存的Master進(jìn)程守護(hù),Worker節(jié)點(diǎn)上常駐Worker守護(hù)進(jìn)程,負(fù)責(zé)與Master節(jié)點(diǎn)通信,通過(guò)ExecutorRunner來(lái)控制運(yùn)行在當(dāng)前節(jié)點(diǎn)上的CoarseGrainedExecutorBackend,
每個(gè)Worker上存在一個(gè)或多個(gè)CoarseGrainedExecutorBackend進(jìn)程,每個(gè)進(jìn)程包含一個(gè)Executor對(duì)象,該對(duì)象持有一個(gè)線程池,每個(gè)線程池可以執(zhí)行一個(gè)Task
?
節(jié)點(diǎn)類型:
1. master?節(jié)點(diǎn): 常駐master進(jìn)程,負(fù)責(zé)管理全部worker節(jié)點(diǎn)。
2. worker?節(jié)點(diǎn): 常駐worker進(jìn)程,負(fù)責(zé)管理executor?并與master節(jié)點(diǎn)通信。
dirvier:官方解釋為:?The process running the main() function of the application and creating the SparkContext。即理解為用戶自己編寫的應(yīng)用程序
Executor:執(zhí)行器:
在每個(gè)WorkerNode上為某應(yīng)用啟動(dòng)的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行任務(wù),并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上,每個(gè)任務(wù)都有各自獨(dú)立的Executor。
Executor是一個(gè)執(zhí)行Task的容器。它的主要職責(zé)是:
1、初始化程序要執(zhí)行的上下文SparkEnv,解決應(yīng)用程序需要運(yùn)行時(shí)的jar包的依賴,加載類。
2、同時(shí)還有一個(gè)ExecutorBackend向cluster manager匯報(bào)當(dāng)前的任務(wù)狀態(tài),這一方面有點(diǎn)類似hadoop的tasktracker和task。
總結(jié):Executor是一個(gè)應(yīng)用程序運(yùn)行的監(jiān)控和執(zhí)行容器。Executor的數(shù)目可以在submit時(shí),由?--num-executors (on yarn)指定.
Job:?
包含很多task的并行計(jì)算,可以認(rèn)為是Spark RDD?里面的action,每個(gè)action的計(jì)算會(huì)生成一個(gè)job。
用戶提交的Job會(huì)提交給DAGScheduler,Job會(huì)被分解成Stage和Task。
Stage:
一個(gè)Job會(huì)被拆分為多組Task,每組任務(wù)被稱為一個(gè)Stage就像Map Stage,?Reduce Stage。
Stage的劃分在RDD的論文中有詳細(xì)的介紹,簡(jiǎn)單的說(shuō)是以shuffle和result這兩種類型來(lái)劃分。在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask,第一類task的輸出是shuffle所需數(shù)據(jù),第二類task的輸出是result,stage的劃分也以此為依據(jù),shuffle之前的所有變換是一個(gè)stage,shuffle之后的操作是另一個(gè)stage。比如?rdd.parallize(1 to 10).foreach(println)?這個(gè)操作沒有shuffle,直接就輸出了,那么只有它的task是resultTask,stage也只有一個(gè);如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println),?這個(gè)job因?yàn)橛衦educe,所以有一個(gè)shuffle過(guò)程,那么reduceByKey之前的是一個(gè)stage,執(zhí)行shuffleMapTask,輸出shuffle所需的數(shù)據(jù),reduceByKey到最后是一個(gè)stage,直接就輸出結(jié)果了。如果job中有多次shuffle,那么每個(gè)shuffle之前都是一個(gè)stage。
Task
即?stage?下的一個(gè)任務(wù)執(zhí)行單元,一般來(lái)說(shuō),一個(gè)?rdd?有多少個(gè)?partition,就會(huì)有多少個(gè)?task,因?yàn)槊恳粋€(gè)?task?只是處理一個(gè)?partition?上的數(shù)據(jù).
每個(gè)executor執(zhí)行的task的數(shù)目, 可以由submit時(shí),--num-executors(on yarn)?來(lái)指定。
小結(jié):
驅(qū)動(dòng)程序就是執(zhí)行了一個(gè)Spark Application的main函數(shù)和創(chuàng)建Spark Context的進(jìn)程,它包含了這個(gè)application的全部代碼。
Spark Application中的每個(gè)action會(huì)被Spark作為Job進(jìn)行調(diào)度。
每個(gè)Job是一個(gè)計(jì)算序列的最終結(jié)果,而這個(gè)序列中能夠產(chǎn)生中間結(jié)果的計(jì)算就是一個(gè)stage。
對(duì)于Transformations和Actions是有著明確區(qū)分的。通常Action對(duì)應(yīng)了Job,而Transformation對(duì)應(yīng)了Stage
一個(gè)Job被拆分成若干個(gè)Stage,每個(gè)Stage執(zhí)行一些計(jì)算,產(chǎn)生一些中間結(jié)果。它們的目的是最終生成這個(gè)Job的計(jì)算結(jié)果。
而每個(gè)Stage是一個(gè)task set,包含若干個(gè)task。Task是Spark中最小的工作單元,在一個(gè)executor上完成一個(gè)特定的事情。
??1、driver program是用戶寫的帶main函數(shù)的代碼
??2、每個(gè)action算子的操作都會(huì)對(duì)應(yīng)一個(gè)job,例如(ForeachRDD寫入外部系統(tǒng)的一個(gè)操作)
??3、DAGScheduler會(huì)對(duì)Job進(jìn)行拆分,拆分的依據(jù):根據(jù)FinalRDD(在這里ForeachRDD)遞歸向上解析Lineager的依賴關(guān)系,以寬依賴為切分stage的依據(jù),切分成若干個(gè)Stage,遞歸優(yōu)先提交父Stage,每個(gè)Stage里面包含多個(gè)Task任務(wù)
??4、若干個(gè)Transformation的算子RDD組成Stage,所以一個(gè)RDD中有多少個(gè)partition,就有多少個(gè)Task,因?yàn)槊恳粋€(gè)Task只對(duì)一個(gè)partition數(shù)據(jù)做處理。
總結(jié)
- 上一篇: Spark的基本原理
- 下一篇: mac 配置/etc/profile重启