日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

shuffle操作图解以及job-stage-task-partition区别

發布時間:2023/12/31 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 shuffle操作图解以及job-stage-task-partition区别 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

--------------------------------------------------------------shuffle操作圖--------------------------------------------------------------

基本概念:

spark中的partition 是彈性分布式數據集RDD的最小單元

spark.shuffle.manager有三種方式:
hash,sort,tungsten-sort

shuffle示意圖如下(最清晰易懂的):

根據上面的圖可以看到,shuffle其實就是得到reduceByKey的結果的示意圖,



[18]Any join, cogroup, or ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort(提到了哪些操作會導致shuffle)

[19]
cogroup

groupWith

join: hash partition

leftOuterJoin: hash partition

rightOuterJoin: hash partition

groupByKey: hash partition

reduceByKey: hash partition

combineByKey: hash partition

sortByKey: range partition

distinct

intersection: hash partition

repartition

coalesce(提到了哪些操作會導致shuffle)

#---------------------------------------job-stage-task-partition區別----------------------------------------------------------------------

執行一個 rdd 的 action 的時候,會生成一個 job

?stage 是一個 job 的組成單位,就是說,一個 job 會被切分成 1 個或 1 個以上的 stage,然后各個 stage 會按照執行順序依次執行

?A unit of work within a stage, corresponding to one RDD partition。即 stage 下的一個任務執行單元,一般來說,一個 rdd 有多少個 partition(spark中的partition 是彈性分布式數據集RDD的最小單元),就會有多少個 task,因為每一個 task 只是處理一個 partition 上的數據。從 web ui 截圖上我們可以看到,這個 job 一共有 2 個 stage,66 個 task,平均下來每個 stage 有 33 個 task,相當于每個 stage 的數據都有 33 個 partition [注意:這里是平均下來的哦,并不都是每個 stage 有 33 個 task,有時候也會有一個 stage 多,另外一個 stage 少的情況,就看你有沒有在不同的 stage 進行 repartition 類似的操作了。

準備數據集[1]:

hdfs dfs -put names user/mercury

下面輸入代碼(來自[14]):

### packages import pandas as pd from operator import add ### spark UDF (User Defined Functions) def map_extract(element):file_path, content = elementyear = file_path[-8:-4]return [(year, i) for i in content.split("\r\n") if i]### spark logic res = sc.wholeTextFiles('hdfs://Desktop:9000/user/mercury/names',minPartitions=40).map(map_extract) \.flatMap(lambda x: x) \.map(lambda x: (x[0], int(x[1].split(',')[2]))) \.reduceByKey(add) \.collect()### result displaying data = pd.DataFrame.from_records(res, columns=['year', 'birth'])\.sort_values(by='year', ascending=True)ax = data.plot(x="year", y="birth", figsize=(20, 6), title='US Baby Birth Data from 1897 to 2014', linewidth=3) ax.set_facecolor('white') ax.grid(color='gray', alpha=0.2, axis='y')

想要繪制出圖形的話需要pyspark+jupyter Notebook環境的搭建(沒有必要特意去折騰pyspark環境配置,因為pyspark有個官方bug[15])

$ pyspark?--master yarn

然后逐句輸入上述代碼運行即可

#-------------------------------------------------下面是實驗結果分析-------------------------------------------------------------------------------------------------

下面打開web UI界面,步驟如下:

①http://desktop:8088/cluster

②點擊Tracking UI下面的History跳轉到:

這個圖怎么看呢?點擊stage0我們看到:

\

也就是說對應上面代碼到reduceByKey(add) 為止。

?

同樣,點開stage1,看到的是:

很簡單,對應的是代碼中的collect()操作

這兩個DAG圖有一個好處,就是當你不確定你的代碼中哪個部分會導致shuffle操作的時候,你可以上DAG圖看一下,來幫你對代碼中導致 data skew(數據傾斜,很多情況下都是shuffle操作導致的)的部分進行定位

?

[14]當觸發 rdd 的 shuffle 操作時 : 在我們的應用中就是?reduceByKey?這個操作,官方文檔:?rdd.reduceByKey

當觸發 rdd 的 action 時 : 在我們的應用中就是最后的?collect?操作,關于這個操作的說明,可以看官方文檔:?rdd.collect

  • 第一個 stage,即截圖中 stage id 為 0 的 stage,其執行了 sc.wholeTextFiles().map().flatMap().map().reduceByKey() 這幾個步驟,因為這是一個 Shuffle 操作,所以后面會有 Shuffle Read 和 Shuffle Write。具體來說,就是在 stage 0 這個 stage 中,發生了一個 Shuffle 操作,這個操作讀入 22.5 MB 的數據,生成 41.7 KB 的數據,并把生成的數據寫在了硬盤上。

  • 第二個 stage,即截圖中 stage id 為 1 到 stage,其執行了 collect() 這個操作,因為這是一個 action 操作,并且它上一步是一個 Shuffle 操作,且沒有后續操作,所以這里 collect() 這個操作被獨立成一個 stage 了。這里它把上一個 Shuffle 寫下的數據讀取進來,然后一起返回到 driver 端,所以這里可以看到他的 Shuffle Read 這里剛好讀取了上一個 stage 寫下的數據。

#--------------------------------------------------------------------附錄-----------------------------------------------------------------------------------------

哪些是action操作?

參考[20],如下:

行動涵義
reduce(func)使用傳入的函數參數 func 對數據集中的元素進行匯聚操作 (兩兩合并). 該函數應該具有可交換與可結合的性質, 以便于能夠正確地進行并行計算.
collect()在 driver program 上將數據集中的元素作為一個數組返回. 這在執行一個 filter 或是其他返回一個足夠小的子數據集操作后十分有用.
count()返回數據集中的元素個數
first()返回數據集中的第一個元素 (與 take(1) 類似)
take(n)返回數據集中的前 n 個元素
takeSample(withReplacement, num, [seed])以數組的形式返回數據集中隨機采樣的 num 個元素.
takeOrdered(n, [ordering])以其自然序或使用自定義的比較器返回 RDD 的前 n 元素
saveAsTextFile(path)將數據集中的元素寫入到指定目錄下的一個或多個文本文件中, 該目錄可以存在于本地文件系統, HDFS 或其他 Hadoop 支持的文件系統. Spark 將會對每個元素調用 toString 將其轉換為文件的一行文本.
saveAsSequenceFile(path)(Java and Scala)對于本地文件系統, HDFS 或其他任何 Hadoop 支持的文件系統上的一個指定路徑, 將數據集中的元素寫為一個 Hadoop SequenceFile. 僅適用于實現了 Hadoop Writable 接口的 kay-value pair 的 RDD. 在 Scala 中, 同樣適用于能夠被隱式轉換成 Writable 的類型上 (Spark 包含了對于 Int, Double, String 等基本類型的轉換).
saveAsObjectFile(path)(Java and Scala)使用 Java 序列化將數據集中的元素簡單寫為格式化數據, 可以通過 SparkContext.objectFile() 進行加載.
countByKey()僅適用于 (K, V) 類型的 RDD. 返回每個 key 的 value 數的一個 hashmap (K, int) pair.
foreach(func)對數據集中的每個元素執行函數 func. 這通常用于更新一個 Accumulator 或與外部存儲系統交互時的副作用. 注意: 修改 foreach() 外的非 Accumulator 變量可能導致未定義的行為. 更多細節請查看 Understanding closures.

可以看出 action 的所有操作都是針對數據集中 “元素” (element) 級別的動作, action 的主要內容是 存儲計算. 引用 《Learning Spark》 里的說法:

Actions are operations that return a result to the driver program or write it to storage, and kick off a computation, such as count() and first().


哪些操作會導致shuffle?(也可以通過DAG圖進行觀看細節中是否帶有shuffle),參考[16][17]

1、repartition類的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等
2、byKey類的操作:比如reduceByKey、groupByKey、sortByKey等
3、join類的操作:比如join、cogroup等

重分區: 一般會shuffle,因為需要在整個集群中,對之前所有的分區的數據進行隨機,均勻的打亂,然后把數據放入下游新的指定數量的分區內
byKey類的操作:因為你要對一個key,進行聚合操作,那么肯定要保證集群中,所有節點上的,相同的key,一定是到同一個節點上進行處理
join類的操作:兩個rdd進行join,就必須將相同join
key的數據,shuffle到同一個節點上,然后進行相同key的兩個rdd數據的笛卡爾乘積

排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
?

集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

#----------------------------------------------------------參考文獻如下---------------------------------------------------------------------------------------------------

?Reference:

[1]https://www.ssa.gov/oact/babynames/names.zip

[18]https://stackoverflow.com/questions/31386590/when-does-shuffling-occur-in-apache-spark?rq=1
[19]https://intellipaat.com/community/7852/what-are-the-spark-transformations-that-causes-a-shuffle

[13]spark中job stage task關系

[14]『 Spark 』6. 深入研究 spark 運行原理之 job, stage, task

[15]py4j.protocol.Py4JJavaError: An error occurred while calling o90.save(官方bug,目前沒有解決,還沒寫完)

[16]spark shuffle特點和導致shuffle的算子

[17]Spark會產生shuffle的算子

[20]Spark action 操作列表

?

總結

以上是生活随笔為你收集整理的shuffle操作图解以及job-stage-task-partition区别的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。