shuffle操作图解以及job-stage-task-partition区别
--------------------------------------------------------------shuffle操作圖--------------------------------------------------------------
基本概念:
spark中的partition 是彈性分布式數據集RDD的最小單元
spark.shuffle.manager有三種方式:
hash,sort,tungsten-sort
shuffle示意圖如下(最清晰易懂的):
根據上面的圖可以看到,shuffle其實就是得到reduceByKey的結果的示意圖,
[18]Any join, cogroup, or ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort(提到了哪些操作會導致shuffle)
[19]
cogroup
groupWith
join: hash partition
leftOuterJoin: hash partition
rightOuterJoin: hash partition
groupByKey: hash partition
reduceByKey: hash partition
combineByKey: hash partition
sortByKey: range partition
distinct
intersection: hash partition
repartition
coalesce(提到了哪些操作會導致shuffle)
#---------------------------------------job-stage-task-partition區別----------------------------------------------------------------------
執行一個 rdd 的 action 的時候,會生成一個 job
?stage 是一個 job 的組成單位,就是說,一個 job 會被切分成 1 個或 1 個以上的 stage,然后各個 stage 會按照執行順序依次執行
?A unit of work within a stage, corresponding to one RDD partition。即 stage 下的一個任務執行單元,一般來說,一個 rdd 有多少個 partition(spark中的partition 是彈性分布式數據集RDD的最小單元),就會有多少個 task,因為每一個 task 只是處理一個 partition 上的數據。從 web ui 截圖上我們可以看到,這個 job 一共有 2 個 stage,66 個 task,平均下來每個 stage 有 33 個 task,相當于每個 stage 的數據都有 33 個 partition [注意:這里是平均下來的哦,并不都是每個 stage 有 33 個 task,有時候也會有一個 stage 多,另外一個 stage 少的情況,就看你有沒有在不同的 stage 進行 repartition 類似的操作了。
準備數據集[1]:
hdfs dfs -put names user/mercury
下面輸入代碼(來自[14]):
### packages import pandas as pd from operator import add ### spark UDF (User Defined Functions) def map_extract(element):file_path, content = elementyear = file_path[-8:-4]return [(year, i) for i in content.split("\r\n") if i]### spark logic res = sc.wholeTextFiles('hdfs://Desktop:9000/user/mercury/names',minPartitions=40).map(map_extract) \.flatMap(lambda x: x) \.map(lambda x: (x[0], int(x[1].split(',')[2]))) \.reduceByKey(add) \.collect()### result displaying data = pd.DataFrame.from_records(res, columns=['year', 'birth'])\.sort_values(by='year', ascending=True)ax = data.plot(x="year", y="birth", figsize=(20, 6), title='US Baby Birth Data from 1897 to 2014', linewidth=3) ax.set_facecolor('white') ax.grid(color='gray', alpha=0.2, axis='y')想要繪制出圖形的話需要pyspark+jupyter Notebook環境的搭建(沒有必要特意去折騰pyspark環境配置,因為pyspark有個官方bug[15])
$ pyspark?--master yarn
然后逐句輸入上述代碼運行即可
#-------------------------------------------------下面是實驗結果分析-------------------------------------------------------------------------------------------------
下面打開web UI界面,步驟如下:
①http://desktop:8088/cluster
②點擊Tracking UI下面的History跳轉到:
這個圖怎么看呢?點擊stage0我們看到:
\
也就是說對應上面代碼到reduceByKey(add) 為止。
?
同樣,點開stage1,看到的是:
很簡單,對應的是代碼中的collect()操作
這兩個DAG圖有一個好處,就是當你不確定你的代碼中哪個部分會導致shuffle操作的時候,你可以上DAG圖看一下,來幫你對代碼中導致 data skew(數據傾斜,很多情況下都是shuffle操作導致的)的部分進行定位
?
[14]當觸發 rdd 的 shuffle 操作時 : 在我們的應用中就是?reduceByKey?這個操作,官方文檔:?rdd.reduceByKey
當觸發 rdd 的 action 時 : 在我們的應用中就是最后的?collect?操作,關于這個操作的說明,可以看官方文檔:?rdd.collect
-
第一個 stage,即截圖中 stage id 為 0 的 stage,其執行了 sc.wholeTextFiles().map().flatMap().map().reduceByKey() 這幾個步驟,因為這是一個 Shuffle 操作,所以后面會有 Shuffle Read 和 Shuffle Write。具體來說,就是在 stage 0 這個 stage 中,發生了一個 Shuffle 操作,這個操作讀入 22.5 MB 的數據,生成 41.7 KB 的數據,并把生成的數據寫在了硬盤上。
-
第二個 stage,即截圖中 stage id 為 1 到 stage,其執行了 collect() 這個操作,因為這是一個 action 操作,并且它上一步是一個 Shuffle 操作,且沒有后續操作,所以這里 collect() 這個操作被獨立成一個 stage 了。這里它把上一個 Shuffle 寫下的數據讀取進來,然后一起返回到 driver 端,所以這里可以看到他的 Shuffle Read 這里剛好讀取了上一個 stage 寫下的數據。
#--------------------------------------------------------------------附錄-----------------------------------------------------------------------------------------
哪些是action操作?
參考[20],如下:
| reduce(func) | 使用傳入的函數參數 func 對數據集中的元素進行匯聚操作 (兩兩合并). 該函數應該具有可交換與可結合的性質, 以便于能夠正確地進行并行計算. |
| collect() | 在 driver program 上將數據集中的元素作為一個數組返回. 這在執行一個 filter 或是其他返回一個足夠小的子數據集操作后十分有用. |
| count() | 返回數據集中的元素個數 |
| first() | 返回數據集中的第一個元素 (與 take(1) 類似) |
| take(n) | 返回數據集中的前 n 個元素 |
| takeSample(withReplacement, num, [seed]) | 以數組的形式返回數據集中隨機采樣的 num 個元素. |
| takeOrdered(n, [ordering]) | 以其自然序或使用自定義的比較器返回 RDD 的前 n 元素 |
| saveAsTextFile(path) | 將數據集中的元素寫入到指定目錄下的一個或多個文本文件中, 該目錄可以存在于本地文件系統, HDFS 或其他 Hadoop 支持的文件系統. Spark 將會對每個元素調用 toString 將其轉換為文件的一行文本. |
| saveAsSequenceFile(path)(Java and Scala) | 對于本地文件系統, HDFS 或其他任何 Hadoop 支持的文件系統上的一個指定路徑, 將數據集中的元素寫為一個 Hadoop SequenceFile. 僅適用于實現了 Hadoop Writable 接口的 kay-value pair 的 RDD. 在 Scala 中, 同樣適用于能夠被隱式轉換成 Writable 的類型上 (Spark 包含了對于 Int, Double, String 等基本類型的轉換). |
| saveAsObjectFile(path)(Java and Scala) | 使用 Java 序列化將數據集中的元素簡單寫為格式化數據, 可以通過 SparkContext.objectFile() 進行加載. |
| countByKey() | 僅適用于 (K, V) 類型的 RDD. 返回每個 key 的 value 數的一個 hashmap (K, int) pair. |
| foreach(func) | 對數據集中的每個元素執行函數 func. 這通常用于更新一個 Accumulator 或與外部存儲系統交互時的副作用. 注意: 修改 foreach() 外的非 Accumulator 變量可能導致未定義的行為. 更多細節請查看 Understanding closures. |
可以看出 action 的所有操作都是針對數據集中 “元素” (element) 級別的動作, action 的主要內容是 存儲 和 計算. 引用 《Learning Spark》 里的說法:
Actions are operations that return a result to the driver program or write it to storage, and kick off a computation, such as count() and first().
哪些操作會導致shuffle?(也可以通過DAG圖進行觀看細節中是否帶有shuffle),參考[16][17]
1、repartition類的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等
2、byKey類的操作:比如reduceByKey、groupByKey、sortByKey等
3、join類的操作:比如join、cogroup等
重分區: 一般會shuffle,因為需要在整個集群中,對之前所有的分區的數據進行隨機,均勻的打亂,然后把數據放入下游新的指定數量的分區內
byKey類的操作:因為你要對一個key,進行聚合操作,那么肯定要保證集群中,所有節點上的,相同的key,一定是到同一個節點上進行處理
join類的操作:兩個rdd進行join,就必須將相同join
key的數據,shuffle到同一個節點上,然后進行相同key的兩個rdd數據的笛卡爾乘積
排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
?
集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
#----------------------------------------------------------參考文獻如下---------------------------------------------------------------------------------------------------
?Reference:
[1]https://www.ssa.gov/oact/babynames/names.zip
[18]https://stackoverflow.com/questions/31386590/when-does-shuffling-occur-in-apache-spark?rq=1
[19]https://intellipaat.com/community/7852/what-are-the-spark-transformations-that-causes-a-shuffle
[13]spark中job stage task關系
[14]『 Spark 』6. 深入研究 spark 運行原理之 job, stage, task
[15]py4j.protocol.Py4JJavaError: An error occurred while calling o90.save(官方bug,目前沒有解決,還沒寫完)
[16]spark shuffle特點和導致shuffle的算子
[17]Spark會產生shuffle的算子
[20]Spark action 操作列表
?
總結
以上是生活随笔為你收集整理的shuffle操作图解以及job-stage-task-partition区别的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LOL打单子id怎么改 艾欧尼亚VS诺克
- 下一篇: spark的Web UI查看DAG的两种