日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

本地提交spark_Spark 数据本地化级别

發(fā)布時間:2025/3/11 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 本地提交spark_Spark 数据本地化级别 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?

?

RDD 源碼

大家可以看到源碼中的第五條注釋說明,翻譯過來的大概意思是提供一系列的最佳計(jì)算位置。

我之前一直不太清楚 spark 是如何內(nèi)部實(shí)現(xiàn)的,今天就帶領(lǐng)大家來看一看 spark 的本地?cái)?shù)據(jù)化級別在任務(wù)執(zhí)行中的演變過程。

1 數(shù)據(jù)的本地化級別有哪些?

Spark 中任務(wù)的處理需要考慮數(shù)據(jù)的本地性,以 spark 1.6 為例,目前支持一下幾種。(中英文排版很頭疼,誰來幫幫我啊)

PROCESS_LOCAL 進(jìn)程本地化,表示 task 要計(jì)算的數(shù)據(jù)在同一個 Executor 中。

NODE_LOCAL 節(jié)點(diǎn)本地化,速度稍慢,因?yàn)閿?shù)據(jù)需要在不同的進(jìn)程之間傳遞或從文件中讀取。分為兩種情況,第一種:task 要計(jì)算的數(shù)據(jù)是在同一個 worker 的不同 Executor 進(jìn)程中。第二種:task 要計(jì)算的數(shù)據(jù)是在同一個 worker 的磁盤上,或在 HDFS 上恰好有 block 在同一個節(jié)點(diǎn)上。如果 Spark 要計(jì)算的數(shù)據(jù)來源于 HDFSD 上,那么最好的本地化級別就是 NODE_LOCAL。

NO_PREF 沒有最佳位置,數(shù)據(jù)從哪訪問都一樣快,不需要位置優(yōu)先。比如 Spark SQL 從 Mysql 中讀取數(shù)據(jù)。

RACK_LOCAL 機(jī)架本地化,數(shù)據(jù)在同一機(jī)架的不同節(jié)點(diǎn)上。需要通過網(wǎng)絡(luò)傳輸數(shù)據(jù)以及文件 IO,比 NODE_LOCAL 慢。情況一:task 計(jì)算的數(shù)據(jù)在 worker2 的 EXecutor 中。情況二:task 計(jì)算的數(shù)據(jù)在 work2 的磁盤上。

ANY 跨機(jī)架,數(shù)據(jù)在非同一機(jī)架的網(wǎng)絡(luò)上,速度最慢。

如果不是很清楚,我畫(造)了一張圖放在這以供大家理解。

?

?

2 Spark 的數(shù)據(jù)本地化由誰來負(fù)責(zé)呢?

val rdd1 = sc.textFile("hdfs://tsl...") rdd1.cache()rdd1.map.filter.count()

?

上面這段簡單的代碼,背后其實(shí)做什么很多事情。Driver 的 TaskScheduler 在發(fā)送 task 之前,首先應(yīng)該拿到 rdd1 數(shù)據(jù)所在的位置,rdd1 封裝了這個文件所對應(yīng)的 block 的位置,DAGScheduler 通過調(diào)用 getPrerredLocations() 拿到 partition 所對應(yīng)的數(shù)據(jù)的位置,TaskScheduler 根據(jù)這些位置來發(fā)送相應(yīng)的 task。

具體的解釋:

DAGScheduler 切割Job,劃分Stage, 通過調(diào)用 submitStage 來提交一個Stage 對應(yīng)的 tasks,submitStage 會調(diào)用 submitMissingTasks, submitMissingTasks 確定每個需要計(jì)算的 task 的preferredLocations,通過調(diào)用 getPreferrdeLocations() 得到 partition 的優(yōu)先位置,就是這個 partition 對應(yīng)的 task 的優(yōu)先位置,對于要提交到 TaskScheduler 的 TaskSet 中的每一個task,該 task 優(yōu)先位置與其對應(yīng)的 partition 對應(yīng)的優(yōu)先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 會為每個 TaskSet 創(chuàng)建一個 TaskSetManager 對象,該對象包含taskSet 所有 tasks,并管理這些 tasks 的執(zhí)行,其中就包括計(jì)算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在調(diào)度和延遲調(diào)度 tasks 時發(fā)揮作用。

總的來說,Spark 中的數(shù)據(jù)本地化是由 DAGScheduler 和 TaskScheduler 共同負(fù)責(zé)的。

3

計(jì)算節(jié)點(diǎn)與輸入數(shù)據(jù)位置的關(guān)系,下面以一個圖來展開 spark 是如何讓進(jìn)行調(diào)度的。這一個過程會涉及 RDD, DAGScheduler , TaskScheduler。

?

?

第一步:PROCESS_LOCAL

TaskScheduler 根據(jù)數(shù)據(jù)的位置向數(shù)據(jù)節(jié)點(diǎn)發(fā)送 task 任務(wù)。如果這個任務(wù)在 worker1 的 Executor 中等待了 3 秒。(默認(rèn)的,可以通過spark.locality.wait 來設(shè)置),可以通過 SparkConf() 來修改,重試了 5 次之后,還是無法執(zhí)行,TaskScheduler 就會降低數(shù)據(jù)本地化的級別,從 PROCESS_LOCAL 降到 NODE_LOCAL。

第二步:NODE_LOCAL

TaskScheduler 重新發(fā)送 task 到 worker1 中的 Executor2 中執(zhí)行,如果 task 在worker1 的 Executor2 中等待了 3 秒,重試了 5 次,還是無法執(zhí)行,TaskScheduler 就會降低數(shù)據(jù)本地化的級別,從 NODE_LOCAL 降到 RACK_LOCAL。

第三步:RACK_LOCAL

TaskScheduler重新發(fā)送 task 到 worker2 中的 Executor1 中執(zhí)行。

第四步:

當(dāng) task 分配完成之后,task 會通過所在的 worker 的 Executor 中的 BlockManager 來獲取數(shù)據(jù)。如果 BlockManager 發(fā)現(xiàn)自己沒有數(shù)據(jù),那么它會調(diào)用 getRemote() 方法,通過 ConnectionManager 與原 task 所在節(jié)點(diǎn)的 BlockManager 中的 ConnectionManager先建立連接,然后通過TransferService(網(wǎng)絡(luò)傳輸組件)獲取數(shù)據(jù),通過網(wǎng)絡(luò)傳輸回task所在節(jié)點(diǎn)(這時候性能大幅下降,大量的網(wǎng)絡(luò)IO占用資源),計(jì)算后的結(jié)果返回給Driver。這一步很像 shuffle 的文件尋址流程,Spark 的 shuffle 文件尋址流程

4

TaskScheduler在發(fā)送task的時候,會根據(jù)數(shù)據(jù)所在的節(jié)點(diǎn)發(fā)送task,這時候的數(shù)據(jù)本地化的級別是最高的,如果這個task在這個Executor中等待了三秒,重試發(fā)射了5次還是依然無法執(zhí)行,那么TaskScheduler就會認(rèn)為這個Executor的計(jì)算資源滿了,TaskScheduler會降低一級數(shù)據(jù)本地化的級別,重新發(fā)送task到其他的Executor中執(zhí)行,如果還是依然無法執(zhí)行,那么繼續(xù)降低數(shù)據(jù)本地化的級別...

如果想讓每一個 task 都能拿到最好的數(shù)據(jù)本地化級別,那么調(diào)優(yōu)點(diǎn)就是等待時間加長。注意!如果過度調(diào)大等待時間,雖然為每一個 task 都拿到了最好的數(shù)據(jù)本地化級別,但是我們 job 執(zhí)行的時間也會隨之延長。

下面是官方提供的參數(shù)說明:

?

?

可以在代碼里面這樣設(shè)置:

?

new SparkConf.set("spark.locality.wait

總結(jié)

以上是生活随笔為你收集整理的本地提交spark_Spark 数据本地化级别的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。