Spark内核架构
1、初識Spark
Spark是分布式的,主要基于內(nèi)存的,適合迭代計(jì)算的大數(shù)據(jù)計(jì)算框架。注意基于內(nèi)存:是優(yōu)先考慮將數(shù)據(jù)放到內(nèi)存中,因?yàn)樵趦?nèi)存中具有更好的數(shù)據(jù)本地性,但是如果內(nèi)存放不下也會(huì)放在磁盤上,或者部分?jǐn)?shù)據(jù)放在磁盤上計(jì)算。所以Spark不僅能夠計(jì)算內(nèi)存放的下的數(shù)據(jù),也可以計(jì)算內(nèi)存中放不下的數(shù)據(jù)(Spark的真正生產(chǎn)環(huán)境,如果數(shù)據(jù)大于內(nèi)存,需要考慮數(shù)據(jù)的放置策略以及性能調(diào)優(yōu)的技巧)。
由于Spark底層是基于RDD(分布式彈性數(shù)據(jù)集)的抽象,所以不僅可以支持目前Spark已經(jīng)支持的5種計(jì)算方式(流處理,SQL,圖計(jì)算,機(jī)器學(xué)習(xí)等),還可以支持其他。
個(gè)人編寫的程序會(huì)經(jīng)過Driver驅(qū)動(dòng)器,提交到集群中,在集群中的某些節(jié)點(diǎn)中運(yùn)行。處理數(shù)據(jù)的來源有HDFS,HBase,Hive,傳統(tǒng)的關(guān)系數(shù)據(jù)庫,處理后的數(shù)據(jù)可以放到HDFS,HBase,Hive(使用數(shù)據(jù)倉庫),DB,顯示在客戶端的輸出端,s3等。
2、理解Spark核心的三個(gè)方面
2.1 分布式
分布式就是多臺機(jī)器,當(dāng)然Spark也可以像一個(gè)JVM進(jìn)程一樣Local模式,開發(fā)測試debug的時(shí)候在本地運(yùn)行,但是生產(chǎn)環(huán)境一定是分布式多臺機(jī)器運(yùn)行。
分布式的多臺機(jī)器運(yùn)行,首先會(huì)有一個(gè)提交具體程序的Driver(或客戶端),程序會(huì)
被提交給集群,集群中會(huì)有很多臺機(jī)器,默認(rèn)情況下,每臺機(jī)器是一個(gè)節(jié)點(diǎn)。Spark程序提交到Spark集群中進(jìn)行運(yùn)行,程序本身會(huì)處理一批數(shù)據(jù),分布式下,不同的節(jié)點(diǎn)會(huì)處理一部分?jǐn)?shù)據(jù),不同的節(jié)點(diǎn)之間的操作互不影響。這樣分布式的處理會(huì)使得程序?qū)?shù)據(jù)的處理更節(jié)約時(shí)間。
分布式做并行化就相當(dāng)于一個(gè)圖書館,有很多書,數(shù)據(jù)里面有書。如果是以前單機(jī)版本的,處理的時(shí)候就是線性的去數(shù)每一個(gè)書架。如果是分布式的,可能圖書館館長(Cluster Manager)分配計(jì)算資源說找1000個(gè)人,每個(gè)人負(fù)責(zé)一個(gè)書架的書的數(shù)量計(jì)算,那這1000個(gè)人并行計(jì)算,速度非???。這1000個(gè)人計(jì)算完之后就交給圖書館館長,那他最后在進(jìn)行統(tǒng)計(jì)。分布式由于應(yīng)用了并行計(jì)算,所以肯定會(huì)處理更快。
2.2 主要基于內(nèi)存
整個(gè)數(shù)據(jù)在進(jìn)行計(jì)算的時(shí)候,肯定是希望數(shù)據(jù)是在內(nèi)存中的,不希望在本地磁盤上,更不希望通過網(wǎng)絡(luò)從遠(yuǎn)程機(jī)器上把數(shù)據(jù)抓過來,所以Spark優(yōu)先考慮內(nèi)存其實(shí)是對計(jì)算機(jī)資源最大化利用的物理機(jī)制。
每個(gè)節(jié)點(diǎn)的數(shù)據(jù)首先會(huì)被放于內(nèi)存中,內(nèi)存容量不足時(shí),會(huì)被放到磁盤中。放于內(nèi)存中的數(shù)據(jù),經(jīng)過第一個(gè)階段的計(jì)算后,處理的結(jié)果可以繼續(xù)在其他節(jié)點(diǎn)上進(jìn)行下一個(gè)階段的計(jì)算。這是迭代計(jì)算。
2.3 迭代計(jì)算
擅長迭代式計(jì)算是Spark真正的精髓,因?yàn)閷?shí)際我們凡事對數(shù)據(jù)進(jìn)行稍有價(jià)值的挖掘,或稍有復(fù)雜度的挖掘一定是對這個(gè)多步的計(jì)算。Spark天生就是適合分布式的主要基于內(nèi)存的迭代式計(jì)算,當(dāng)然也適合分布式基于磁盤的迭代式計(jì)算。
數(shù)據(jù)被存放在不同節(jié)點(diǎn)中,數(shù)據(jù)不移動(dòng),程序移動(dòng)。程序在計(jì)算完第一個(gè)階段后,進(jìn)行shuffle,數(shù)據(jù)被移動(dòng)到其他節(jié)點(diǎn),shuffle過程的不同策略,導(dǎo)致第一個(gè)階段處理的結(jié)果,例如某一個(gè)節(jié)點(diǎn)的數(shù)據(jù)會(huì)被分發(fā)到不同的節(jié)點(diǎn),以便進(jìn)行下一個(gè)階段的計(jì)算。
3、Spark的架構(gòu)中的基本組件
3.1 Driver
Driver是應(yīng)用程序application運(yùn)行的時(shí)候的核心,因?yàn)樗?fù)責(zé)了整個(gè)作業(yè)的調(diào)度,并且向master申請資源來完成具體作業(yè)。
運(yùn)行Application的main()函數(shù)并創(chuàng)建SparkContext,本身是整個(gè)程序運(yùn)行調(diào)度的核心,會(huì)有高層調(diào)度器DAGScheduler(把作業(yè)劃分層幾個(gè)小的階段)和底層調(diào)度器TaskScheduler(每個(gè)階段里面的任務(wù)該具體怎么去處理),還有schedulerbackend去管理整個(gè)集群為當(dāng)前的程序分配的計(jì)算資源(本身就是executor)。
driver除了創(chuàng)建對象,也會(huì)向master注冊當(dāng)前的程序,如果注冊沒問題的話master會(huì)分配資源,下一步就是根據(jù)他的action觸發(fā)這個(gè)job,job里面有一系列的RDD,從后往前回溯,如果發(fā)現(xiàn)是寬依賴的話就劃分不同的Stage,把Stage提交給底層調(diào)度器TaskScheduler,TaskScheduler拿到這個(gè)任務(wù)的集合。因?yàn)橐粋€(gè)Stage內(nèi)部都是計(jì)算邏輯完全一樣的任務(wù),只是計(jì)算邏輯不一樣而已,底層調(diào)度器就會(huì)根據(jù)數(shù)據(jù)的本地性把數(shù)據(jù)放到executor去執(zhí)行。而且這個(gè)executor在任務(wù)運(yùn)行結(jié)束或者出狀況的時(shí)候肯定要向driver匯報(bào),最后運(yùn)行完畢的時(shí)候SparkContext關(guān)閉。
3.2?Application
應(yīng)用程序application就是用戶編寫的spark代碼打包后的jar包和相關(guān)依賴,包含了driver功能的代碼,和分布在集群中多個(gè)節(jié)點(diǎn)的executor的代碼。也就是應(yīng)用程序有兩個(gè)層面,一個(gè)是driver層面,一個(gè)是executor層面。driver是驅(qū)動(dòng)executor工作的,executor是具體處理數(shù)據(jù)分片,內(nèi)部是線程池并發(fā)的處理。driver層面的代碼其實(shí)就是mian方法中new sparkConf然后配置,創(chuàng)建sparkContext,也就是sparkConf+sparkContext,基于sparkContext接下來就開始創(chuàng)建RDD了,這些代碼是具體的業(yè)務(wù)實(shí)現(xiàn),就是executor層面的代碼
3.3?Cluster Manager
在standalone模式中即為Master主節(jié)點(diǎn),控制整個(gè)集群,監(jiān)控worker。在YARN模式中為資源管理器ResourceManager。
在粗粒度的資源分配方式在,spark程序application的運(yùn)行不依賴于Cluster Manager。也就是說spark應(yīng)用程序注冊給Cluster Manager,注冊如果是成功的Cluster Manager就提前分配好了資源,運(yùn)行過程中不需要Cluster Manager的參與。所以Cluster Manager可插拔。
3.4?Worker
從節(jié)點(diǎn),負(fù)責(zé)控制計(jì)算節(jié)點(diǎn),啟動(dòng)Executor。在YARN模式中為NodeManager,負(fù)責(zé)計(jì)算節(jié)點(diǎn)的控制,啟動(dòng)的進(jìn)程叫Container。
worker就是集群中任何可以運(yùn)行application操作代碼的節(jié)點(diǎn)。worker上是不會(huì)運(yùn)行我們程序代碼的,worker是管理當(dāng)前節(jié)點(diǎn)內(nèi)存CPU等資源的使用狀況,會(huì)接收mater分配資源的指令,并通過executorRunner具體啟動(dòng)一個(gè)新進(jìn)程,進(jìn)程里面有executor。
worker管理當(dāng)前NODE的資源并接受master指令來分配具體的計(jì)算資源Executor(在新的進(jìn)程中分跑配)。他分配的時(shí)候會(huì)有一個(gè)ExecutorRunner,就是我們要分配一個(gè)新的進(jìn)程來做計(jì)算的時(shí)候worker都會(huì)有一個(gè)ExecutorRunner,相當(dāng)于一個(gè)Proxy管理具體新分配的進(jìn)程,其實(shí)就是在ExecutorRunner中幫我們遠(yuǎn)程創(chuàng)建出新的進(jìn)程。
Worker本身是個(gè)進(jìn)程,不會(huì)向mater匯報(bào)當(dāng)前機(jī)器的CPU,內(nèi)存的等信息,worker發(fā)心跳主要只有一個(gè)作用workid,當(dāng)前機(jī)器上的資源是我們應(yīng)用程序在注冊的時(shí)候,注冊成功master就會(huì)給我們分配資源,分配的時(shí)候會(huì)記錄這個(gè)資源。發(fā)心跳的時(shí)候不會(huì)匯報(bào)資源,只有在發(fā)生故障的時(shí)候說資源出現(xiàn)的情況。
3.5?Executor
執(zhí)行器,在worker node上執(zhí)行任務(wù)的組件、用于啟動(dòng)線程池運(yùn)行任務(wù)。每個(gè)Application擁有獨(dú)立的一組Executors。
executor是運(yùn)行在worker節(jié)點(diǎn)上的為當(dāng)前應(yīng)用程序開啟的一個(gè)進(jìn)程里面的處理對象,這個(gè)對象負(fù)責(zé)具體task運(yùn)行,是線程池并發(fā)執(zhí)行和線程復(fù)用的方式。線程池中的每個(gè)線程可以運(yùn)行一個(gè)任務(wù),然后任務(wù)運(yùn)行完回收到池子進(jìn)行線程復(fù)用。(這就比Hadoop的MapReduce好多了,需要開啟JVM執(zhí)行完了其中一個(gè)Map或Reduce不能復(fù)用JVM,而且JVM比較重量級)。而spark默認(rèn)在一個(gè)節(jié)點(diǎn)上為程序開啟一個(gè)JVM進(jìn)程,這個(gè)JVM進(jìn)程里面是線程池的方式,通過線程處理具體的task任務(wù)。
一個(gè)worker默認(rèn)會(huì)為當(dāng)前應(yīng)用程序開辟一個(gè)executor,當(dāng)然可以配置多個(gè)。executor線程池中的線程運(yùn)行task的時(shí)候,task肯定要從磁盤或者內(nèi)存中讀寫數(shù)據(jù)。每個(gè)application都有自己獨(dú)立的一批executor。
executor配置多少看情況,如只有一個(gè)executor處理作業(yè),占據(jù)了大量的CPUcore,但是資源閑置,這是巨大的資源浪費(fèi),另外一方面由于CPUcore個(gè)數(shù)是有限的,而在特定個(gè)CPUcore的時(shí)候只有一個(gè)executor如果數(shù)據(jù)比較大的情況下容易內(nèi)存溢出OOM,這個(gè)時(shí)候就要分成幾個(gè)executor。
4?spark提交程序流程
Spark的driver的運(yùn)行有2種模式,一種是Client模式,一種是cluster模式。默認(rèn)是Client模式(因?yàn)镃lient模式的時(shí)候可以看見跟多交互式日志的信息,就是運(yùn)行過程的信息),如果指定為模式cluster模式,這樣真正的driver就會(huì)在worker中的一臺機(jī)器上,在哪臺有master決定。
首先構(gòu)建Spark Application的運(yùn)行環(huán)境(啟動(dòng)SparkContext),SparkContext里面最重要的是做3件事情:①創(chuàng)建DAGScheduler(劃分Satge)②創(chuàng)建TaskScheduler(負(fù)責(zé)一個(gè)Stage內(nèi)部作業(yè)運(yùn)行)③創(chuàng)建SchedulerBackend(計(jì)算資源)。在實(shí)例化過程中向master注冊當(dāng)前應(yīng)用程序,master接收注冊,如果沒有問題會(huì)為當(dāng)前程序分配APPid并分配計(jì)算資源(從3個(gè)地方獲取①spark-env.sh或spark-default.sh②saprk-submit的時(shí)候提供的參數(shù)③程序中saprkconf配置的參數(shù))。
然后Cluster Manager接收用戶提交的程序并發(fā)送指令給Worker為當(dāng)前應(yīng)用程序分配計(jì)算資源,每個(gè)Worker所在節(jié)點(diǎn)默認(rèn)為當(dāng)前應(yīng)用程序分配一個(gè)Executor,在Executor中通過線程池并發(fā)執(zhí)行。
然后Worker進(jìn)程通過一個(gè)proxy為ExecutorRunner的對象實(shí)例遠(yuǎn)程啟動(dòng)ExecutorBackend進(jìn)程,ExecutorBackend進(jìn)程里面有Executor。
分配完資源之后,下一步就是通過action觸發(fā)具體的job,這時(shí)候DAGScheduler會(huì)把job中的RDD構(gòu)成的DAG劃分成不同的Stage,每個(gè)Stage內(nèi)部是一系列業(yè)務(wù)邏輯完全相同但是處理數(shù)據(jù)不同的Tasks,構(gòu)成TaskSet。TaskSet會(huì)交給TaskScheduler和Schedulerbackend負(fù)責(zé)具體task的運(yùn)行(遵循數(shù)據(jù)本地性)。每個(gè)Task會(huì)計(jì)算RDD中的一個(gè)Partition,基于Partition來執(zhí)行具體我們定義的一系列同個(gè)Stage內(nèi)部的函數(shù),依次類推知道整個(gè)程序運(yùn)行完成。
Task有兩種類型:ResultTask和ShuffleMapTask:最后一個(gè)Stage中的task為ResultTask產(chǎn)生job的結(jié)果,其他前面的Stage中的task都是ShuffleMapTask為下一個(gè)階段的Stage做數(shù)據(jù)準(zhǔn)備。
總結(jié):①首先構(gòu)建Spark Application的運(yùn)行環(huán)境(啟動(dòng)SparkContext),在實(shí)例化過程中向master注冊當(dāng)前應(yīng)用程序,master接收注冊,如果沒有問題會(huì)為當(dāng)前程序分配APPid并分配計(jì)算資源②master接收用戶提交的程序并發(fā)送指令給Worker為當(dāng)前應(yīng)用程序分配計(jì)算資源③Worker進(jìn)程通過ExecutorRunner的對象啟動(dòng)ExecutorBackend進(jìn)程,ExecutorBackend進(jìn)程里面有Executor④分配完資源之后,下一步就是通過action觸發(fā)具體的job,這時(shí)候DAGScheduler會(huì)把job中的RDD構(gòu)成的DAG劃分成不同的Stage,每個(gè)Stage內(nèi)部是一系列業(yè)務(wù)邏輯完全相同但是處理數(shù)據(jù)不同的Tasks,構(gòu)成TaskSet。TaskSet會(huì)交給TaskScheduler和Schedulerbackend負(fù)責(zé)具體task的運(yùn)行,每個(gè)Task會(huì)計(jì)算RDD中的一個(gè)Partition,基于Partition來執(zhí)行具體我們定義的一系列同個(gè)Stage內(nèi)部的函數(shù),依次類推知道整個(gè)程序運(yùn)行完成。
?
總結(jié)
- 上一篇: Maven详解及实例
- 下一篇: 筛选末位数字为1或5_看看广州示范性高中