本地提交spark_Spark 数据本地化级别
?
?
RDD 源碼
大家可以看到源碼中的第五條注釋說明,翻譯過來的大概意思是提供一系列的最佳計(jì)算位置。
我之前一直不太清楚 spark 是如何內(nèi)部實(shí)現(xiàn)的,今天就帶領(lǐng)大家來看一看 spark 的本地?cái)?shù)據(jù)化級別在任務(wù)執(zhí)行中的演變過程。
1 數(shù)據(jù)的本地化級別有哪些?
Spark 中任務(wù)的處理需要考慮數(shù)據(jù)的本地性,以 spark 1.6 為例,目前支持一下幾種。(中英文排版很頭疼,誰來幫幫我啊)
PROCESS_LOCAL 進(jìn)程本地化,表示 task 要計(jì)算的數(shù)據(jù)在同一個 Executor 中。
NODE_LOCAL 節(jié)點(diǎn)本地化,速度稍慢,因?yàn)閿?shù)據(jù)需要在不同的進(jìn)程之間傳遞或從文件中讀取。分為兩種情況,第一種:task 要計(jì)算的數(shù)據(jù)是在同一個 worker 的不同 Executor 進(jìn)程中。第二種:task 要計(jì)算的數(shù)據(jù)是在同一個 worker 的磁盤上,或在 HDFS 上恰好有 block 在同一個節(jié)點(diǎn)上。如果 Spark 要計(jì)算的數(shù)據(jù)來源于 HDFSD 上,那么最好的本地化級別就是 NODE_LOCAL。
NO_PREF 沒有最佳位置,數(shù)據(jù)從哪訪問都一樣快,不需要位置優(yōu)先。比如 Spark SQL 從 Mysql 中讀取數(shù)據(jù)。
RACK_LOCAL 機(jī)架本地化,數(shù)據(jù)在同一機(jī)架的不同節(jié)點(diǎn)上。需要通過網(wǎng)絡(luò)傳輸數(shù)據(jù)以及文件 IO,比 NODE_LOCAL 慢。情況一:task 計(jì)算的數(shù)據(jù)在 worker2 的 EXecutor 中。情況二:task 計(jì)算的數(shù)據(jù)在 work2 的磁盤上。
ANY 跨機(jī)架,數(shù)據(jù)在非同一機(jī)架的網(wǎng)絡(luò)上,速度最慢。
如果不是很清楚,我畫(造)了一張圖放在這以供大家理解。
?
?
2 Spark 的數(shù)據(jù)本地化由誰來負(fù)責(zé)呢?
val rdd1 = sc.textFile("hdfs://tsl...") rdd1.cache()rdd1.map.filter.count()?
上面這段簡單的代碼,背后其實(shí)做什么很多事情。Driver 的 TaskScheduler 在發(fā)送 task 之前,首先應(yīng)該拿到 rdd1 數(shù)據(jù)所在的位置,rdd1 封裝了這個文件所對應(yīng)的 block 的位置,DAGScheduler 通過調(diào)用 getPrerredLocations() 拿到 partition 所對應(yīng)的數(shù)據(jù)的位置,TaskScheduler 根據(jù)這些位置來發(fā)送相應(yīng)的 task。
具體的解釋:
DAGScheduler 切割Job,劃分Stage, 通過調(diào)用 submitStage 來提交一個Stage 對應(yīng)的 tasks,submitStage 會調(diào)用 submitMissingTasks, submitMissingTasks 確定每個需要計(jì)算的 task 的preferredLocations,通過調(diào)用 getPreferrdeLocations() 得到 partition 的優(yōu)先位置,就是這個 partition 對應(yīng)的 task 的優(yōu)先位置,對于要提交到 TaskScheduler 的 TaskSet 中的每一個task,該 task 優(yōu)先位置與其對應(yīng)的 partition 對應(yīng)的優(yōu)先位置一致。
TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 會為每個 TaskSet 創(chuàng)建一個 TaskSetManager 對象,該對象包含taskSet 所有 tasks,并管理這些 tasks 的執(zhí)行,其中就包括計(jì)算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在調(diào)度和延遲調(diào)度 tasks 時發(fā)揮作用。
總的來說,Spark 中的數(shù)據(jù)本地化是由 DAGScheduler 和 TaskScheduler 共同負(fù)責(zé)的。
3
計(jì)算節(jié)點(diǎn)與輸入數(shù)據(jù)位置的關(guān)系,下面以一個圖來展開 spark 是如何讓進(jìn)行調(diào)度的。這一個過程會涉及 RDD, DAGScheduler , TaskScheduler。
?
?
第一步:PROCESS_LOCAL
TaskScheduler 根據(jù)數(shù)據(jù)的位置向數(shù)據(jù)節(jié)點(diǎn)發(fā)送 task 任務(wù)。如果這個任務(wù)在 worker1 的 Executor 中等待了 3 秒。(默認(rèn)的,可以通過spark.locality.wait 來設(shè)置),可以通過 SparkConf() 來修改,重試了 5 次之后,還是無法執(zhí)行,TaskScheduler 就會降低數(shù)據(jù)本地化的級別,從 PROCESS_LOCAL 降到 NODE_LOCAL。
第二步:NODE_LOCAL
TaskScheduler 重新發(fā)送 task 到 worker1 中的 Executor2 中執(zhí)行,如果 task 在worker1 的 Executor2 中等待了 3 秒,重試了 5 次,還是無法執(zhí)行,TaskScheduler 就會降低數(shù)據(jù)本地化的級別,從 NODE_LOCAL 降到 RACK_LOCAL。
第三步:RACK_LOCAL
TaskScheduler重新發(fā)送 task 到 worker2 中的 Executor1 中執(zhí)行。
第四步:
當(dāng) task 分配完成之后,task 會通過所在的 worker 的 Executor 中的 BlockManager 來獲取數(shù)據(jù)。如果 BlockManager 發(fā)現(xiàn)自己沒有數(shù)據(jù),那么它會調(diào)用 getRemote() 方法,通過 ConnectionManager 與原 task 所在節(jié)點(diǎn)的 BlockManager 中的 ConnectionManager先建立連接,然后通過TransferService(網(wǎng)絡(luò)傳輸組件)獲取數(shù)據(jù),通過網(wǎng)絡(luò)傳輸回task所在節(jié)點(diǎn)(這時候性能大幅下降,大量的網(wǎng)絡(luò)IO占用資源),計(jì)算后的結(jié)果返回給Driver。這一步很像 shuffle 的文件尋址流程,Spark 的 shuffle 文件尋址流程
4
TaskScheduler在發(fā)送task的時候,會根據(jù)數(shù)據(jù)所在的節(jié)點(diǎn)發(fā)送task,這時候的數(shù)據(jù)本地化的級別是最高的,如果這個task在這個Executor中等待了三秒,重試發(fā)射了5次還是依然無法執(zhí)行,那么TaskScheduler就會認(rèn)為這個Executor的計(jì)算資源滿了,TaskScheduler會降低一級數(shù)據(jù)本地化的級別,重新發(fā)送task到其他的Executor中執(zhí)行,如果還是依然無法執(zhí)行,那么繼續(xù)降低數(shù)據(jù)本地化的級別...
如果想讓每一個 task 都能拿到最好的數(shù)據(jù)本地化級別,那么調(diào)優(yōu)點(diǎn)就是等待時間加長。注意!如果過度調(diào)大等待時間,雖然為每一個 task 都拿到了最好的數(shù)據(jù)本地化級別,但是我們 job 執(zhí)行的時間也會隨之延長。
下面是官方提供的參數(shù)說明:
?
?
可以在代碼里面這樣設(shè)置:
?
new SparkConf.set("spark.locality.wait總結(jié)
以上是生活随笔為你收集整理的本地提交spark_Spark 数据本地化级别的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java tls 实例_grpc加密TL
- 下一篇: android打印参数类型,__andr