日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Spark数据倾斜的完美解决

發(fā)布時(shí)間:2024/4/15 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark数据倾斜的完美解决 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

數(shù)據(jù)傾斜解決方案

數(shù)據(jù)傾斜的解決,跟之前講解的性能調(diào)優(yōu),有一點(diǎn)異曲同工之妙。

性能調(diào)優(yōu)中最有效最直接最簡(jiǎn)單的方式就是加資源加并行度,并注意RDD架構(gòu)(復(fù)用同一個(gè)RDD,加上cache緩存)。相對(duì)于前面,shuffle、jvm等是次要的。

6.1、原理以及現(xiàn)象分析

6.1.1、數(shù)據(jù)傾斜怎么出現(xiàn)的

在執(zhí)行shuffle操作的時(shí)候,是按照key,來(lái)進(jìn)行values的數(shù)據(jù)的輸出、拉取和聚合的。

同一個(gè)key的values,一定是分配到一個(gè)reduce task進(jìn)行處理的。

多個(gè)key對(duì)應(yīng)的values,比如一共是90萬(wàn)。可能某個(gè)key對(duì)應(yīng)了88萬(wàn)數(shù)據(jù),被分配到一個(gè)task上去面去執(zhí)行。

另外兩個(gè)task,可能各分配到了1萬(wàn)數(shù)據(jù),可能是數(shù)百個(gè)key,對(duì)應(yīng)的1萬(wàn)條數(shù)據(jù)。

這樣就會(huì)出現(xiàn)數(shù)據(jù)傾斜問題。

想象一下,出現(xiàn)數(shù)據(jù)傾斜以后的運(yùn)行的情況。很糟糕!

其中兩個(gè)task,各分配到了1萬(wàn)數(shù)據(jù),可能同時(shí)在10分鐘內(nèi)都運(yùn)行完了。另外一個(gè)task有88萬(wàn)條,88 * 10 =? 880分鐘 = 14.5個(gè)小時(shí)。

大家看,本來(lái)另外兩個(gè)task很快就運(yùn)行完畢了(10分鐘),但是由于一個(gè)拖后腿的家伙,第三個(gè)task,要14.5個(gè)小時(shí)才能運(yùn)行完,就導(dǎo)致整個(gè)spark作業(yè),也得14.5個(gè)小時(shí)才能運(yùn)行完。

數(shù)據(jù)傾斜,一旦出現(xiàn),是不是性能殺手?!

6.1.2、發(fā)生數(shù)據(jù)傾斜以后的現(xiàn)象

Spark數(shù)據(jù)傾斜,有兩種表現(xiàn):

1、你的大部分的task,都執(zhí)行的特別特別快,(你要用client模式,standalone client,yarn client,本地機(jī)器一執(zhí)行spark-submit腳本,就會(huì)開始打印log),task175 finished,剩下幾個(gè)task,執(zhí)行的特別特別慢,前面的task,一般1s可以執(zhí)行完5個(gè),最后發(fā)現(xiàn)1000個(gè)task,998,999 task,要執(zhí)行1個(gè)小時(shí),2個(gè)小時(shí)才能執(zhí)行完一個(gè)task。

出現(xiàn)以上loginfo,就表明出現(xiàn)數(shù)據(jù)傾斜了。

這樣還算好的,因?yàn)殡m然老牛拉破車一樣非常慢,但是至少還能跑。

2、另一種情況是,運(yùn)行的時(shí)候,其他task都執(zhí)行完了,也沒什么特別的問題,但是有的task,就是會(huì)突然間報(bào)了一個(gè)OOM,JVM Out Of Memory,內(nèi)存溢出了,task failed,task lost,resubmitting task。反復(fù)執(zhí)行幾次都到了某個(gè)task就是跑不通,最后就掛掉。

某個(gè)task就直接OOM,那么基本上也是因?yàn)閿?shù)據(jù)傾斜了,task分配的數(shù)量實(shí)在是太大了!所以內(nèi)存放不下,然后你的task每處理一條數(shù)據(jù),還要?jiǎng)?chuàng)建大量的對(duì)象,內(nèi)存爆掉了。

這樣也表明出現(xiàn)數(shù)據(jù)傾斜了。

這種就不太好了,因?yàn)槟愕某绦蛉绻蝗ソ鉀Q數(shù)據(jù)傾斜的問題,壓根兒就跑不出來(lái)。

作業(yè)都跑不完,還談什么性能調(diào)優(yōu)這些東西?!

6.1.3、定位數(shù)據(jù)傾斜出現(xiàn)的原因與出現(xiàn)問題的位置

根據(jù)log去定位

出現(xiàn)數(shù)據(jù)傾斜的原因,基本只可能是因?yàn)榘l(fā)生了shuffle操作,在shuffle的過程中,出現(xiàn)了數(shù)據(jù)傾斜的問題。因?yàn)槟硞€(gè)或者某些key對(duì)應(yīng)的數(shù)據(jù),遠(yuǎn)遠(yuǎn)的高于其他的key。

1、你在自己的程序里面找找,哪些地方用了會(huì)產(chǎn)生shuffle的算子,groupByKey、countByKey、reduceByKey、join

2、看log

log一般會(huì)報(bào)是在你的哪一行代碼,導(dǎo)致了OOM異常。或者看log,看看是執(zhí)行到了第幾個(gè)stage。spark代碼,是怎么劃分成一個(gè)一個(gè)的stage的。哪一個(gè)stage生成的task特別慢,就能夠自己用肉眼去對(duì)你的spark代碼進(jìn)行stage的劃分,就能夠通過stage定位到你的代碼,到底哪里發(fā)生了數(shù)據(jù)傾斜。

?1、使用Hive ETL預(yù)處理數(shù)據(jù)

方案適用場(chǎng)景:

如果導(dǎo)致數(shù)據(jù)傾斜的是Hive表。如果該Hive表中的數(shù)據(jù)本身很不均勻(比如某個(gè)key對(duì)應(yīng)了100萬(wàn)數(shù)據(jù),其他key才對(duì)應(yīng)了10條數(shù)據(jù)),而且業(yè)務(wù)場(chǎng)景需要頻繁使用Spark對(duì)Hive表執(zhí)行某個(gè)分析操作,那么比較適合使用這種技術(shù)方案。

方案實(shí)現(xiàn)思路:

此時(shí)可以評(píng)估一下,是否可以通過Hive來(lái)進(jìn)行數(shù)據(jù)預(yù)處理(即通過Hive ETL預(yù)先對(duì)數(shù)據(jù)按照key進(jìn)行聚合,或者是預(yù)先和其他表進(jìn)行join),然后在Spark作業(yè)中針對(duì)的數(shù)據(jù)源就不是原來(lái)的Hive表了,而是預(yù)處理后的Hive表。此時(shí)由于數(shù)據(jù)已經(jīng)預(yù)先進(jìn)行過聚合或join操作了,那么在Spark作業(yè)中也就不需要使用原先的shuffle類算子執(zhí)行這類操作了。

方案實(shí)現(xiàn)原理:

這種方案從根源上解決了數(shù)據(jù)傾斜,因?yàn)閺氐妆苊饬嗽赟park中執(zhí)行shuffle類算子,那么肯定就不會(huì)有數(shù)據(jù)傾斜的問題了。但是這里也要提醒一下大家,這種方式屬于治標(biāo)不治本。因?yàn)楫吘箶?shù)據(jù)本身就存在分布不均勻的問題,所以Hive ETL中進(jìn)行g(shù)roup by或者join等shuffle操作時(shí),還是會(huì)出現(xiàn)數(shù)據(jù)傾斜,導(dǎo)致Hive ETL的速度很慢。我們只是把數(shù)據(jù)傾斜的發(fā)生提前到了Hive ETL中,避免Spark程序發(fā)生數(shù)據(jù)傾斜而已。

?

2、過濾少數(shù)導(dǎo)致傾斜的key

方案適用場(chǎng)景:

如果發(fā)現(xiàn)導(dǎo)致傾斜的key就少數(shù)幾個(gè),而且對(duì)計(jì)算本身的影響并不大的話,那么很適合使用這種方案。比如99%的key就對(duì)應(yīng)10條數(shù)據(jù),但是只有一個(gè)key對(duì)應(yīng)了100萬(wàn)數(shù)據(jù),從而導(dǎo)致了數(shù)據(jù)傾斜。

方案實(shí)現(xiàn)思路:

如果我們判斷那少數(shù)幾個(gè)數(shù)據(jù)量特別多的key,對(duì)作業(yè)的執(zhí)行和計(jì)算結(jié)果不是特別重要的話,那么干脆就直接過濾掉那少數(shù)幾個(gè)key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對(duì)RDD執(zhí)行filter算子過濾掉這些key。如果需要每次作業(yè)執(zhí)行時(shí),動(dòng)態(tài)判定哪些key的數(shù)據(jù)量最多然后再進(jìn)行過濾,那么可以使用sample算子對(duì)RDD進(jìn)行采樣,然后計(jì)算出每個(gè)key的數(shù)量,取數(shù)據(jù)量最多的key過濾掉即可。

方案實(shí)現(xiàn)原理:

將導(dǎo)致數(shù)據(jù)傾斜的key給過濾掉之后,這些key就不會(huì)參與計(jì)算了,自然不可能產(chǎn)生數(shù)據(jù)傾斜。

?

3、提高shuffle操作的并行度

方案實(shí)現(xiàn)思路:

在對(duì)RDD執(zhí)行shuffle算子時(shí),給shuffle算子傳入一個(gè)參數(shù),比如reduceByKey(1000),該參數(shù)就設(shè)置了這個(gè)shuffle算子執(zhí)行時(shí)shuffle read task的數(shù)量。對(duì)于Spark SQL中的shuffle類語(yǔ)句,比如group by、join等,需要設(shè)置一個(gè)參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,該值默認(rèn)是200,對(duì)于很多場(chǎng)景來(lái)說(shuō)都有點(diǎn)過小。

方案實(shí)現(xiàn)原理:

增加shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,從而讓每個(gè)task處理比原來(lái)更少的數(shù)據(jù)。舉例來(lái)說(shuō),如果原本有5個(gè)不同的key,每個(gè)key對(duì)應(yīng)10條數(shù)據(jù),這5個(gè)key都是分配給一個(gè)task的,那么這個(gè)task就要處理50條數(shù)據(jù)。而增加了shuffle read task以后,每個(gè)task就分配到一個(gè)key,即每個(gè)task就處理10條數(shù)據(jù),那么自然每個(gè)task的執(zhí)行時(shí)間都會(huì)變短了。

?

4、雙重聚合

方案適用場(chǎng)景:

對(duì)RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語(yǔ)句進(jìn)行分組聚合時(shí),比較適用這種方案。

方案實(shí)現(xiàn)思路:

這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合。第一次是局部聚合,先給每個(gè)key都打上一個(gè)隨機(jī)數(shù),比如10以內(nèi)的隨機(jī)數(shù),此時(shí)原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會(huì)變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對(duì)打上隨機(jī)數(shù)后的數(shù)據(jù),執(zhí)行reduceByKey等聚合操作,進(jìn)行局部聚合,那么局部聚合結(jié)果,就會(huì)變成了(1_hello, 2) (2_hello, 2)。然后將各個(gè)key的前綴給去掉,就會(huì)變成(hello,2)(hello,2),再次進(jìn)行全局聚合操作,就可以得到最終結(jié)果了,比如(hello, 4)。

方案實(shí)現(xiàn)原理:

將原本相同的key通過附加隨機(jī)前綴的方式,變成多個(gè)不同的key,就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合,進(jìn)而解決單個(gè)task處理數(shù)據(jù)量過多的問題。接著去除掉隨機(jī)前綴,再次進(jìn)行全局聚合,就可以得到最終的結(jié)果。如果一個(gè)RDD中有一個(gè)key導(dǎo)致數(shù)據(jù)傾斜,同時(shí)還有其他的key,那么一般先對(duì)數(shù)據(jù)集進(jìn)行抽樣,然后找出傾斜的key,再使用filter對(duì)原始的RDD進(jìn)行分離為兩個(gè)RDD,一個(gè)是由傾斜的key組成的RDD1,一個(gè)是由其他的key組成的RDD2,那么對(duì)于RDD1可以使用加隨機(jī)前綴進(jìn)行多分區(qū)多task計(jì)算,對(duì)于另一個(gè)RDD2正常聚合計(jì)算,最后將結(jié)果再合并起來(lái)。

隨機(jī)前綴加幾,ReduceByKey分幾個(gè)區(qū)。

?

5、將reduce join轉(zhuǎn)為map join(徹底避免數(shù)據(jù)傾斜)

BroadCast+filter(或者map)

方案適用場(chǎng)景:

在對(duì)RDD使用join類操作,或者是在Spark SQL中使用join語(yǔ)句時(shí),而且join操作中的一個(gè)RDD或表的數(shù)據(jù)量比較小(比如幾百M(fèi)或者一兩G),比較適用此方案。

方案實(shí)現(xiàn)思路:

不使用join算子進(jìn)行連接操作,而使用Broadcast變量與map類算子實(shí)現(xiàn)join操作,進(jìn)而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來(lái),然后對(duì)其創(chuàng)建一個(gè)Broadcast變量;接著對(duì)另外一個(gè)RDD執(zhí)行map類算子,在算子函數(shù)內(nèi),從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進(jìn)行比對(duì),如果連接key相同的話,那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式連接起來(lái)。

方案實(shí)現(xiàn)原理:

普通的join是會(huì)走shuffle過程的,而一旦shuffle,就相當(dāng)于會(huì)將相同key的數(shù)據(jù)拉取到一個(gè)shuffle read task中再進(jìn)行join,此時(shí)就是reduce join。但是如果一個(gè)RDD是比較小的,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來(lái)實(shí)現(xiàn)與join同樣的效果,也就是map join,此時(shí)就不會(huì)發(fā)生shuffle操作,也就不會(huì)發(fā)生數(shù)據(jù)傾斜。

?

6、采樣傾斜key并分拆join操作

方案適用場(chǎng)景:

兩個(gè)RDD/Hive表進(jìn)行join的時(shí)候,如果數(shù)據(jù)量都比較大,無(wú)法采用“解決方案五”,那么此時(shí)可以看一下兩個(gè)RDD/Hive表中的key分布情況。如果出現(xiàn)數(shù)據(jù)傾斜,是因?yàn)槠渲心骋粋€(gè)RDD/Hive表中的少數(shù)幾個(gè)key的數(shù)據(jù)量過大,而另一個(gè)RDD/Hive表中的所有key都分布比較均勻,那么采用這個(gè)解決方案是比較合適的。

方案實(shí)現(xiàn)思路:

對(duì)包含少數(shù)幾個(gè)數(shù)據(jù)量過大的key的那個(gè)RDD,通過sample算子采樣出一份樣本來(lái),然后統(tǒng)計(jì)一下每個(gè)key的數(shù)量,計(jì)算出來(lái)數(shù)據(jù)量最大的是哪幾個(gè)key。然后將這幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)從原來(lái)的RDD中拆分出來(lái),形成一個(gè)單獨(dú)的RDD,并給每個(gè)key都打上n以內(nèi)的隨機(jī)數(shù)作為前綴,而不會(huì)導(dǎo)致傾斜的大部分key形成另外一個(gè)RDD。接著將需要join的另一個(gè)RDD,也過濾出來(lái)那幾個(gè)傾斜key對(duì)應(yīng)的數(shù)據(jù)并形成一個(gè)單獨(dú)的RDD,將每條數(shù)據(jù)膨脹成n條數(shù)據(jù),這n條數(shù)據(jù)都按順序附加一個(gè)0~n的前綴,不會(huì)導(dǎo)致傾斜的大部分key也形成另外一個(gè)RDD。再將附加了隨機(jī)前綴的獨(dú)立RDD與另一個(gè)膨脹n倍的獨(dú)立RDD進(jìn)行join,此時(shí)就可以將原先相同的key打散成n份,分散到多個(gè)task中去進(jìn)行join了。而另外兩個(gè)普通的RDD就照常join即可。最后將兩次join的結(jié)果使用union算子合并起來(lái)即可,就是最終的join結(jié)果 。

?

7、使用隨機(jī)前綴和擴(kuò)容RDD進(jìn)行join

?

方案適用場(chǎng)景:

如果在進(jìn)行join操作時(shí),RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜,那么進(jìn)行分拆key也沒什么意義,此時(shí)就只能使用最后一種方案來(lái)解決問題了。

方案實(shí)現(xiàn)思路:

該方案的實(shí)現(xiàn)思路基本和“解決方案六”類似,首先查看RDD/Hive表中的數(shù)據(jù)分布情況,找到那個(gè)造成數(shù)據(jù)傾斜的RDD/Hive表,比如有多個(gè)key都對(duì)應(yīng)了超過1萬(wàn)條數(shù)據(jù)。然后將該RDD的每條數(shù)據(jù)都打上一個(gè)n以內(nèi)的隨機(jī)前綴。同時(shí)對(duì)另外一個(gè)正常的RDD進(jìn)行擴(kuò)容,將每條數(shù)據(jù)都擴(kuò)容成n條數(shù)據(jù),擴(kuò)容出來(lái)的每條數(shù)據(jù)都依次打上一個(gè)0~n的前綴。最后將兩個(gè)處理后的RDD進(jìn)行join即可。

?

總結(jié)

以上是生活随笔為你收集整理的Spark数据倾斜的完美解决的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。