日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

Spark详解(十四):Spark SQL的Join实现

發(fā)布時(shí)間:2025/4/16 数据库 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark详解(十四):Spark SQL的Join实现 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1. 簡(jiǎn)介

Join是SQL語(yǔ)句中的常用操作,良好的表結(jié)構(gòu)能夠?qū)?shù)據(jù)分散在不同的表中,使其符合某種范式,減少表冗余、更新容錯(cuò)等。而建立表和表之間關(guān)系的最佳方式就是Join操作。Join連接是大數(shù)據(jù)處理的重要手段,它基于表之間的共同字段將來(lái)自兩個(gè)或多個(gè)表的行結(jié)合起來(lái)。如今Spark SQL(Dataset/DataFrame)已經(jīng)成為Spark應(yīng)用程序開發(fā)的主流,作為開發(fā)者,我們有必要了解Join在Spark中是如何組織運(yùn)行的。

2. Join的基本要素

如下圖所示,Join大致包括三個(gè)要素:Join方式、Join條件以及過(guò)濾條件。其中過(guò)濾條件也可以通過(guò)AND語(yǔ)句放在Join條件中。

Spark支持所有類型的Join,包括:

  • inner join
  • left outer join
  • right outer join
  • full outer join
  • left semi join
  • left anti join
    下面分別闡述這幾種Join的實(shí)現(xiàn)。

2. Join基本實(shí)現(xiàn)流程

總體上來(lái)說(shuō),Join的基本實(shí)現(xiàn)流程如下圖所示,Spark將參與Join的兩張表抽象為流式遍歷表(streamIter)和查找表(buildIter),通常streamIter為大表,buildIter為小表,我們不用擔(dān)心哪個(gè)表為streamIter,哪個(gè)表為buildIter,這個(gè)spark會(huì)根據(jù)join語(yǔ)句自動(dòng)幫我們完成。

在實(shí)際計(jì)算時(shí),spark會(huì)基于streamIter來(lái)遍歷,每次取出streamIter中的一條記錄rowA,根據(jù)Join條件計(jì)算keyA,然后根據(jù)該keyA去buildIter中查找所有滿足Join條件(keyB==keyA)的記錄rowBs,并將rowBs中每條記錄分別與rowAjoin得到j(luò)oin后的記錄,最后根據(jù)過(guò)濾條件得到最終join的記錄。

從上述計(jì)算過(guò)程中不難發(fā)現(xiàn),對(duì)于每條來(lái)自streamIter的記錄,都要去buildIter中查找匹配的記錄,所以buildIter一定要是查找性能較優(yōu)的數(shù)據(jù)結(jié)構(gòu)。spark提供了三種join實(shí)現(xiàn):sort merge join、broadcast join以及hash join。

2.1 Hash join實(shí)現(xiàn)

spark提供了hash join實(shí)現(xiàn)方式,在shuffle read階段不對(duì)記錄排序,反正來(lái)自兩格表的具有相同key的記錄會(huì)在同一個(gè)分區(qū),只是在分區(qū)內(nèi)不排序,將來(lái)自buildIter的記錄放到hash表中,以便查找,如下圖所示。

由于Spark是一個(gè)分布式的計(jì)算引擎,可以通過(guò)分區(qū)的形式將大批量的數(shù)據(jù)劃分成n份較小的數(shù)據(jù)集進(jìn)行并行計(jì)算。這種思想應(yīng)用到Join上便是Shuffle Hash Join了。利用key相同必然分區(qū)相同的這個(gè)原理,SparkSQL將較大表的join分而治之,先將表劃分成n個(gè)分區(qū),在對(duì)buildlter查找表和streamlter表進(jìn)行Hash Join。

Shuffle Hash Join分為兩步:

  • 對(duì)兩張表分別按照join keys進(jìn)行重分區(qū),即shuffle,目的是為了讓有相同join keys值的記錄分到對(duì)應(yīng)的分區(qū)中
  • 對(duì)對(duì)應(yīng)分區(qū)中的數(shù)據(jù)進(jìn)行join,此處先將小表分區(qū)構(gòu)造為一張hash表,然后根據(jù)大表分區(qū)中記錄的join keys值拿出來(lái)進(jìn)行匹配
  • 不難發(fā)現(xiàn),要將來(lái)自buildIter的記錄放到hash表中,那么每個(gè)分區(qū)來(lái)自buildIter的記錄不能太大,否則就存不下,默認(rèn)情況下hash join的實(shí)現(xiàn)是關(guān)閉狀態(tài),如果要使用hash join,必須滿足以下四個(gè)條件:

    • buildIter總體估計(jì)大小超過(guò)spark.sql.autoBroadcastJoinThreshold設(shè)定的值,即不滿足broadcast join條件
    • 開啟嘗試使用hash join的開關(guān),spark.sql.join.preferSortMergeJoin=false
    • 每個(gè)分區(qū)的平均大小不超過(guò)spark.sql.autoBroadcastJoinThreshold設(shè)定的值,即shuffle read階段每個(gè)分區(qū)來(lái)自buildIter的記錄要能放到內(nèi)存中
    • streamIter的大小是buildIter三倍以上

    2.2 Sort Merge Join 實(shí)現(xiàn)

    上面介紹的實(shí)現(xiàn)對(duì)于一定大小的表比較適用,但當(dāng)兩個(gè)表都非常大時(shí),顯然無(wú)論適用哪種都會(huì)對(duì)計(jì)算內(nèi)存造成很大壓力。這是因?yàn)閖oin時(shí)兩者采取的都是hash join,是將一側(cè)的數(shù)據(jù)完全加載到內(nèi)存中,使用hash code取join keys值相等的記錄進(jìn)行連接。

    要讓兩條記錄能join到一起,首先需要將具有相同key的記錄在同一個(gè)分區(qū),所以通常來(lái)說(shuō),需要做一次shuffle,map階段根據(jù)join條件確定每條記錄的key,基于該key做shuffle write,將可能join到一起的記錄分到同一個(gè)分區(qū)中,這樣在shuffle read階段就可以將兩個(gè)表中具有相同key的記錄拉到同一個(gè)分區(qū)處理。前面我們也提到,對(duì)于buildIter一定要是查找性能較優(yōu)的數(shù)據(jù)結(jié)構(gòu),通常我們能想到hash表,但是對(duì)于一張較大的表來(lái)說(shuō),不可能將所有記錄全部放到hash表中,SparkSQL采用了一種全新的方案來(lái)對(duì)表進(jìn)行Join,即Sort Merge Join。這種實(shí)現(xiàn)方式不用將一側(cè)數(shù)據(jù)全部加載后再進(jìn)行hash join,但需要在join前將數(shù)據(jù)排序,如下圖所示:

    在shuffle read階段,分別對(duì)streamIter和buildIter進(jìn)行merge sort,在遍歷streamIter時(shí),對(duì)于每條記錄,都采用順序查找的方式從buildIter查找對(duì)應(yīng)的記錄,由于兩個(gè)表都是排序的,每次處理完streamIter的一條記錄后,對(duì)于streamIter的下一條記錄,只需從buildIter中上一次查找結(jié)束的位置開始查找,所以說(shuō)每次在buildIter中查找不必重頭開始,整體上來(lái)說(shuō),查找性能還是較優(yōu)的。

    2.3 Broadcast Join實(shí)現(xiàn)

    為了能具有相同key的記錄分到同一個(gè)分區(qū),我們通常是做shuffle,而shuffle在Spark中是比較耗時(shí)的操作,我們應(yīng)該盡可能的設(shè)計(jì)Spark應(yīng)用使其避免大量的shuffle。。那么如果buildIter是一個(gè)非常小的表,那么其實(shí)就沒(méi)有必要大動(dòng)干戈做shuffle了,直接將buildIter廣播到每個(gè)計(jì)算節(jié)點(diǎn),然后將buildIter放到hash表中,如下圖所示。

    在執(zhí)行上,主要可以分為以下兩步:

  • broadcast階段:將小表廣播分發(fā)到大表所在的所有主機(jī)。分發(fā)方式可以有driver分發(fā),或者采用p2p方式。
  • hash join階段:在每個(gè)executor上執(zhí)行單機(jī)版hash join,小表映射,大表試探;
  • Broadcast Join的條件有以下幾個(gè):

  • 被廣播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默認(rèn)是10M (或者加了broadcast join的hint)
  • 基表不能被廣播,比如left outer join時(shí),只能廣播右表
  • 3. Spark 支持的Join類型

    3.1 inner join

    inner join是一定要找到左右表中滿足join條件的記錄,我們?cè)趯憇ql語(yǔ)句或者使用DataFrmae時(shí),可以不用關(guān)心哪個(gè)是左表,哪個(gè)是右表,在spark sql查詢優(yōu)化階段,spark會(huì)自動(dòng)將大表設(shè)為左表,即streamIter,將小表設(shè)為右表,即buildIter。這樣對(duì)小表的查找相對(duì)更優(yōu)。其基本實(shí)現(xiàn)流程如下圖所示,在查找階段,如果右表不存在滿足join條件的記錄,則跳過(guò)。

    3.2 left outer join

    left outer join是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找失敗,則返回一個(gè)所有字段都為null的記錄。我們?cè)趯憇ql語(yǔ)句或者使用DataFrmae時(shí),一般讓大表在左邊,小表在右邊。其基本實(shí)現(xiàn)流程如下圖所示。

    3.3 right outer join

    right outer join是以右表為準(zhǔn),在左表中查找匹配的記錄,如果查找失敗,則返回一個(gè)所有字段都為null的記錄。所以說(shuō),右表是streamIter,左表是buildIter,我們?cè)趯憇ql語(yǔ)句或者使用DataFrmae時(shí),一般讓大表在右邊,小表在左邊。其基本實(shí)現(xiàn)流程如下圖所示。

    3.4 full outer join

    full outer join相對(duì)來(lái)說(shuō)要復(fù)雜一點(diǎn),總體上來(lái)看既要做left outer join,又要做right outer join,但是又不能簡(jiǎn)單地先left outer join,再right outer join,最后union得到最終結(jié)果,因?yàn)檫@樣最終結(jié)果中就存在兩份inner join的結(jié)果了。因?yàn)榧热煌瓿蒷eft outer join又要完成right outer join,所以full outer join僅采用sort merge join實(shí)現(xiàn),左邊和右表既要作為streamIter,又要作為buildIter,其基本實(shí)現(xiàn)流程如下圖所示。

    由于左表和右表已經(jīng)排好序,首先分別順序取出左表和右表中的一條記錄,比較key,如果key相等,則joinrowA和rowB,并將rowA和rowB分別更新到左表和右表的下一條記錄;如果keyA<keyB,則說(shuō)明右表中沒(méi)有與左表rowA對(duì)應(yīng)的記錄,那么joinrowA與nullRow,緊接著,rowA更新到左表的下一條記錄;如果keyA>keyB,則說(shuō)明左表中沒(méi)有與右表rowB對(duì)應(yīng)的記錄,那么joinnullRow與rowB,緊接著,rowB更新到右表的下一條記錄。如此循環(huán)遍歷直到左表和右表的記錄全部處理完。

    3.5 left semi join

    left semi join是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找成功,則僅返回左邊的記錄,否則返回null,其基本實(shí)現(xiàn)流程如下圖所示。

    3.6 left anti join

    left anti join與left semi join相反,是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找成功,則返回null,否則僅返回左邊的記錄,其基本實(shí)現(xiàn)流程如下圖所示。

    4. 總結(jié)

    Join是數(shù)據(jù)庫(kù)查詢中一個(gè)非常重要的語(yǔ)法特性,在數(shù)據(jù)庫(kù)領(lǐng)域可以說(shuō)是“得join者的天下”,SparkSQL作為一種分布式數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng),給我們提供了全面的join支持,并在內(nèi)部實(shí)現(xiàn)上無(wú)聲無(wú)息地做了很多優(yōu)化,了解join的實(shí)現(xiàn)將有助于我們更深刻的了解我們的應(yīng)用程序的運(yùn)行軌跡。

    總結(jié)

    以上是生活随笔為你收集整理的Spark详解(十四):Spark SQL的Join实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。