日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Spark的调度系统

發(fā)布時間:2023/12/14 windows 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark的调度系统 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一,簡介

Spark調(diào)度機制可以理解為兩個層面的調(diào)度。首先,是Spark Application調(diào)度。也就是Spark應用程序在集群運行的調(diào)度,應用程序包括Driver調(diào)度和Executor調(diào)度。其次,就是每個Spark Application都會有若干Jobs(Spark Actions),然后這些job是以何種機制,在Executor上執(zhí)行的,也是需要一個調(diào)度管理的機制,該層面調(diào)度也可以理解為SparkContext內(nèi)部調(diào)度。之所以會出現(xiàn)這種情況,主要是生產(chǎn)中可能會希望一個SparkContext作為服務,同時執(zhí)行若干Jobs,此時可以用Spark Application的Fair機制的調(diào)度。

?

二,App之間的調(diào)度

在以集群的方式運行Spark App時,每個Spark App會包含一些列獨立資源的Executor JVMs,這些JVMs僅僅運行該App的tasks,緩存該App的數(shù)據(jù)。當有多個應用或者多個程序在你的集群中運行時,這就牽涉到如何在集群中給這些Spark App分配資源。

最簡單的方式是提供靜態(tài)資源分配。也即給運行程序分配固定資源,資源數(shù)在該程序運行期間都不會有變動。這種方式出現(xiàn)在Spark的Standalone,yarn和coarse-grained Mesos?模式。資源的分配方式,在每種集群運行模式中有些不同:

?

1,standalone模式

默認情況下,app在Standalone集群中以FIFO的方式運行。您可以通過設置spark.cores.max配置屬性來限制應用程序使用的節(jié)點數(shù),也可以通過spark.deploy.defaultCores更改未設置此應用程序的默認值。 最后,除了控制cores之外,每個應用程序可以使用spark.executor.memory配置控制其內(nèi)存使用。

?

2,mesos

要在Mesos上使用靜態(tài)分區(qū),請將spark.mesos.coarse配置屬性設置為true,并且可選地像Standalone模式一樣設置spark.cores.max來限制每個應用程序的資源。您還應該設置spark.executor.memory來控制執(zhí)行程序內(nèi)存。

?

3,yarn

Spark YARN客戶端的--num-executors選項控制在集群上分配的Executor數(shù)量,而--executor-memory和--executor-cores則控制每個執(zhí)行程序的資源。

?

在Mesos模式下還有一個可選項,那就是動態(tài)共享Core。在這種模式下,Spark App依然擁有固定數(shù)量和獨立的內(nèi)存(spark.executor.memory設置),但是當Spark App在一個機器上沒有運行的task的時候,其它的程序可以使用這些cores運行tasks。當你需要運行大量不活躍的Spark App時,此模式是有用的。但是此模式,是有一定風險的,因為當Spark App需要恢復使用這些cores的時候,需要等待一些時間才能使用這些core去執(zhí)行任務。要使用此模式,只需要使用mesos://URL并將spark.executor.coarse設置為false。

?

請注意,目前沒有提供任何夸應用程序的內(nèi)存共享。如果想進行夸應用程序共享數(shù)據(jù),建議使用第三方存儲,例如tachyon來實現(xiàn)內(nèi)存共享。

?

三,動態(tài)資源申請

Spark提供了一種動態(tài)調(diào)整應用程序占用資源的機制。 這意味著如果您的應用程序不再使用,您的應用程序可能會將資源返回給群集,并在需要時再次請求它們。 如果多個應用程序在Spark群集中共享資源,則此功能特別有用。

在任何coarse-grained集群管理器(Standalone,yarn?,mesos coarse-grained模式?)此功能禁用。

?

1,配置

用這種配置有兩個要求:

第一,需要Spark App設置spark.dynamicAllocation.enabled為true。

第二,需要在集群中每個節(jié)點啟用外部shuffle服務,設置spark.shuffle.service.enabled為true。使用這種模式,運行應用程序在深處的時候,不需要刪除它們輸出的shuffle的文件。根據(jù)不同的集群管理器,該服務的設置方式稍微有點不同。

在Standalone模式下,僅僅在?spark.shuffle.service.enabled設置為true的情況下啟動你的Worker。

在mesos模式下,在?spark.shuffle.service.enabled設置為true的情況下,在所有節(jié)點運行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh腳本。

在YARN模式下,按如下所示在每個NodeManager上啟動shuffle?服務:

A),在編譯Spark的時候要添加yarn屬性。假如,已經(jīng)添加該屬性,并分發(fā)到集群中,跳過此步驟。

B),指定spark-<version>-yarn-shuffle.jar。假如你自己編譯的Spark該jar應該在 下面的目錄里$SPARK_HOME/network/yarn/target/scala-<version>?,以分發(fā)到集群的方式里,可以放到lib里面。

C),將該jar放到NodeManager的Classpath里面。

D),在每個節(jié)點的yarn-site.xml中,給屬性yarn.nodemanager.aux-services增加一個spark_shuffle值,然后yarn.nodemanager.aux-services.spark_shuffle.class設置為org.apache.spark.network.yarn.YarnShuffleService.

E),重啟所有的NodeManager。

?

2,資源申請策略

在高層次上,Spark在不再使用時應放棄executors?,在需要時申請executors?。 既然沒有確定的方法可以預測即將被刪除的executors?是否會在不久的將來執(zhí)行任務,還是將要添加的新的執(zhí)行者實際上是空閑的,那么我們需要一套啟發(fā)式來確定 何時刪除,請求executors?。

2.1,請求策略

允許動態(tài)申請內(nèi)存的Spark應用程序在存在等待執(zhí)行的task的時候會申請額外的Executors。

Spark會輪訓詢申請資源。當有spark.dynamicAllocation.schedulerBacklogTimeout秒的掛起任務時觸發(fā)實際的請求,然后如果掛起的任務隊列仍然存在,則每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒再次觸發(fā)。另外每次申請的Executors會基于上次申請的數(shù)目以指數(shù)的形式增長。比如,應用程序第一輪增加了1個Executor,那么接著會是2,4,8.

指數(shù)增長政策的動機是雙重的。應用程序申請Executor應該謹慎,證明少量Executor即可完成任務。這反映了TCP緩慢啟動的理由。第二,應用程序應該能夠及時提高其資源使用情況,以證明實際需要許多Executor。

2.2?刪除策略

刪除executors?的策略要簡單得多。Spark App在executors?空閑時間超過spark.dynamicAllocation.executorIdleTimeout?秒后刪除Executors。請注意,在大多數(shù)情況下,這種情況與請求條件相互排斥,因為如果仍然有待執(zhí)行的任務時,Executor不應該空閑。

?

3,Executors優(yōu)雅退出

在動態(tài)分配之前,Spark Executors在出現(xiàn)故障或退出相關應用程序時退出。在這兩種情況下,與Executors相關聯(lián)的所有狀態(tài)不再需要,可以被安全地丟棄。然而,通過動態(tài)分配,當Executors被顯式刪除時,應用程序仍在運行。如果應用程序嘗試訪問由Executors存儲或寫入的狀態(tài),則必須執(zhí)行重新計算狀態(tài)。因此,Spark需要一種機制,通過在刪除執(zhí)行程序之前保留其狀態(tài)才能正常退出Executors。

這個要求對于shuffle尤其重要。Shuffle期間,SparkExecutor首先寫map輸出到磁盤,然后當做文件服務器,提供給其它Executor獲取這些輸出文件。存在落后的tasks,他們運行的時間比其它tasks長,動態(tài)申請的Executor有可能在shuffle未結束之前就被移除了,在這種情況下,shuffle輸出文件必須要重新計算,這個是很沒必要的。

可以使用外部shuffle服務保存shuffle輸出文件,從spark 1.2開始引入。此服務是指一個長期運行的進程,它們獨立于Spark應用程序及其executors,在集群的每個節(jié)點上運行。如果服務已啟用,Spark執(zhí)行程序將從服務中獲取shuffle文件,而不是從其它Executor。這意味著由執(zhí)行人員寫入的任何shuffle?狀態(tài)可能會繼續(xù)執(zhí)行超出Executor的生命周期。

除了寫shuffle文件之外,執(zhí)行程序還可以在磁盤或內(nèi)存中緩存數(shù)據(jù)。但是,當執(zhí)行器被刪除時,所有緩存的數(shù)據(jù)將不再可訪問。為了避免這種情況,默認的包含緩存數(shù)據(jù)的executors?永遠不會被刪除。您可以使用spark.dynamicAllocation.cachedExecutorIdleTimeout配置此行為。在將來的版本中,緩存的數(shù)據(jù)可能會通過堆棧外的存儲來保存,這個存儲類似于通過外部shuffle服務來保存shuffle文件的方式。

?

四,Spark App內(nèi)部調(diào)度

在給定的Spark應用程序(SparkContext實例)中,如果從單獨的線程提交多個并行作業(yè),則可以同時運行。“job”,在本節(jié)中,我們是指一個Spark action(例如,save,collect)以及任何需要運行的任務以評估該動作。Spark的調(diào)度程序是完全線程安全的,并支持這種用例來啟用提供多個請求的應用程序(例如,多個用戶的查詢)。

默認情況下,Spark的調(diào)度程序以FIFO方式運行作業(yè)。每個job會被劃分成很多stage(例如,map階段,reduce階段),在第一個job運行技術之后,第二個job才有會去執(zhí)行。如果在隊列頭部的job不需要使用集群的全部資源,那么后面的job可以立即執(zhí)行。隊列頭部的job很大的話,其余的job必須推遲執(zhí)行。

從Spark 0.8開始,也可以在作業(yè)之間配置公平的共享。在公平分享下,Spark以“循環(huán)”方式在任務之間分配tasks,使所有job獲得大致相等的集群資源份額。這意味著長job運行期間提交的短job,也可以立即獲取到資源,并且仍然可以獲得良好的響應時間,而無需等待長job完成。此模式最適合多用戶。

要啟用公平調(diào)度程序,只需在配置SparkContext時將spark.scheduler.mode屬性設置為FAIR:

val?conf =?new?SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode",?"FAIR")
val?sc =?new?SparkContext(conf)

?

1,公平調(diào)度池

公平調(diào)度程序還支持將作業(yè)分組到池中,并為每個池設置不同的調(diào)度選項(例如權重)。這對于為更重要的job創(chuàng)建“高優(yōu)先級”池是有用的,或將每個用戶的job分組在一起,并給予用戶相等的份額,而不管他們有多少并發(fā)作業(yè),而不是給予作業(yè)相等的份額。這種方法是在Hadoop Fair Scheduler之后建模的。這種方法是在Hadoop Fair Scheduler之后建模的。

沒有任何干預,新提交的作業(yè)進入默認池,但是可以通過向提交的線程中的SparkContext添加spark.scheduler.pool“l(fā)ocal property”來設置作業(yè)的池。如下:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool",?"pool1")

設置此本地屬性后,在此線程中提交的所有作業(yè)(通過此線程中的調(diào)用到RDD.save,count,collect等)將使用此pool?名稱。該設置是每個線程,使得線程可以代表同一用戶運行多個作業(yè)變得容易。如果要清除線程與之關聯(lián)的池,只需調(diào)用:

sc.setLocalProperty("spark.scheduler.pool",?null)

?

2,池的默認行為

默認情況下,每個pool獲得相同的集群份額(在默認池中每個job獲得相同的共享資源),但是每個池中的作業(yè)依然是FIFO的順序運行。例如,如果您為每個用戶創(chuàng)建一個池,這意味著每個用戶將獲得該群集的相等份額,并且每個用戶的查詢將按順序運行。

?

3,配置池屬性

特定池的屬性也可以通過配置文件進行修改。?每個池支持三個屬性:

A),schedulingMode

這可以是FIFO或FAIR,以控制池中的作業(yè)以隊列的形式順序執(zhí)行(默認),或公平分享池的資源。

B),weight

這可以控制池相對于其他池的共享。默認情況下,所有池的權重均為1.如果將特定池的權重設置為2,例如,它將獲得兩倍的資源相比其他活躍的池。設置諸如1000之類的高重量也使得可以在池之間實現(xiàn)優(yōu)先級?-?實質上,weight-1000池將始終在任務激活時首先啟動任務。

C),minShare

除了總體weight之外,每個池可以被給予管理員希望具有的最小份額(如一些CPU內(nèi)核)。公平調(diào)度員總是嘗試在根據(jù)權重重新分配額外的資源之前滿足所有活動池的最小份額。因此,minShare屬性可以是另一種確保池總是能夠快速獲得一定數(shù)量的資源(例如10個內(nèi)核)的方法。默認情況下,每個池的minShare為0。

可以通過創(chuàng)建類似于conf / fairscheduler.xml.template的XML文件,并在SparkConf中設置spark.scheduler.allocation.file屬性來設置池屬性。

conf.set("spark.scheduler.allocation.file",?"/path/to/file")

XML文件的格式只是每個池的一個<pool>元素,其中包含不同的元素,用于各種設置。 例如:

<?xml version="1.0"?>
??<allocations>
????<pool name="production">
??????<schedulingMode>FAIR</schedulingMode>
??????<weight>1</weight>
??????<minShare>2</minShare>
????</pool>
????<pool name="test">
??????<schedulingMode>FIFO</schedulingMode>
??????<weight>2</weight>
??????<minShare>3</minShare>
????</pool>
??</allocations>

conf / fairscheduler.xml.template中還提供了一個完整示例。 請注意,沒有在XML文件中配置的任何池將簡單地獲取所有設置(調(diào)度模式FIFO,權重1和minShare 0)的默認值。

?

尾巴:

后面會出文章寫一個基于netty的Spark作為一種查詢服務的Spark App。

總結

以上是生活随笔為你收集整理的Spark的调度系统的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。