日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

SparkSQL 之 Shuffle Join 内核原理及应用深度剖析-Spark商业源码实战

發(fā)布時間:2023/12/9 数据库 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkSQL 之 Shuffle Join 内核原理及应用深度剖析-Spark商业源码实战 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

本套技術(shù)專欄是作者(秦凱新)平時工作的總結(jié)和升華,通過從真實商業(yè)環(huán)境抽取案例進行總結(jié)和分享,并給出商業(yè)應(yīng)用的調(diào)優(yōu)建議和集群環(huán)境容量規(guī)劃等內(nèi)容,請持續(xù)關(guān)注本套博客。版權(quán)聲明:禁止轉(zhuǎn)載,歡迎學(xué)習(xí)。QQ郵箱地址:1120746959@qq.com,如有任何商業(yè)交流,可隨時聯(lián)系。

1 Spark SQL 堅實后盾DataFrame

  • DataFrame是一個分布式數(shù)據(jù)容器,更像傳統(tǒng)數(shù)據(jù)庫的二維表格,除了數(shù)據(jù)以外,還掌握數(shù)據(jù)的結(jié)構(gòu)信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數(shù)據(jù)類型(struct、array和map)。
  • JSON schema自動推導(dǎo)
  • Hive風(fēng)格分區(qū)表自動識別
  • 充分利用RCFile、ORC、Parquet等列式存儲格式的優(yōu)勢,僅掃描查詢真正涉及的列,忽略其余列的數(shù)據(jù)。
  • 聚合統(tǒng)計函數(shù)支持

2 Spark SQL 源碼包結(jié)構(gòu)(溯本逐源)

主要分為4類:

  • core模塊:處理數(shù)據(jù)的輸入輸出,比如:把不同數(shù)據(jù)源(RDD,json,Parquet等)獲取到數(shù)據(jù),并將查詢結(jié)果輸出到DataFrame。
  • catalyst模塊:處理SQL語句的整個過程,包括解析,綁定,優(yōu)化,物理計劃等查詢優(yōu)化。
  • hive模塊:對hive數(shù)據(jù)進行處理。
  • hive-ThriftServer:提供CLI以及JDBC和ODBC接口。

3 Spark SQL catalyst模塊設(shè)計思路

(詳細請參看我的SparkSQL源碼解析內(nèi)容)

catalyst主要組件有

  • sqlParse => sql語句的語法解析
  • Analyzer => 將不同來源的Unresolved Logical Plan和元數(shù)據(jù)(如hive metastore、Schema catalog)進行綁定,生成resolved Logical Plan
  • optimizer => 根據(jù)OptimizationRules,對resolvedLogicalPlan進行合并、列裁剪、過濾器下推等優(yōu)化作業(yè)而轉(zhuǎn)換成optimized Logical Plan
  • Planner => LogicalPlan轉(zhuǎn)換成PhysicalPlan
  • CostModel => 根據(jù)過去的性能統(tǒng)計數(shù)據(jù),選擇最佳的物理執(zhí)行計劃

4 Hash Join的衍生(劍走偏鋒)

4.1 Hash join 設(shè)計思路剖析(總領(lǐng)全局)

  • 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關(guān)心哪個表為streamIter,哪個表為buildIter,這個spark會根據(jù)join語句自動幫我們完成。
  • 第二步:根據(jù)buildIter Table的join key構(gòu)建Hash Table,把每一行記錄都存進HashTable,位于內(nèi)存中。
  • 第三步:掃描streamIter Table 每一行數(shù)據(jù),使用相同的hash函數(shù)匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起
  • 總結(jié) : hash join 只掃描兩表一次,可以認為運算復(fù)雜度為o(a+b),效率非常高。笛卡爾集運算復(fù)雜度為a*b。另外,構(gòu)建的Hash Table最好能全部加載在內(nèi)存,效率最高,這就決定了hash join算法只適合至少一個小表的join場景,對于兩個大表的join場景并不適用。

4.2 broadcast Hash join 設(shè)計思路剖析(大表join極小表)

  • 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關(guān)心哪個表為streamIter,哪個表為buildIter,這個spark會根據(jù)join語句自動幫我們完成。

  • 第二步: 先把小表廣播到所有大表分區(qū)所在節(jié)點,然后根據(jù)buildIter Table的join key構(gòu)建Hash Table,把每一行記錄都存進HashTable

  • 第三步:掃描streamIter Table 每一行數(shù)據(jù),使用相同的hash函數(shù)匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起

  • 總結(jié) : hash join 只掃描兩表一次,可以認為運算復(fù)雜度為o(a+b)。

  • 調(diào)優(yōu)

    1 buildIter總體估計大小超過spark.sql.autoBroadcastJoinThreshold設(shè)定的值,即不滿足broadcast join條件2 開啟嘗試使用hash join的開關(guān),spark.sql.join.preferSortMergeJoin=false3 每個分區(qū)的平均大小不超過spark.sql.autoBroadcastJoinThreshold設(shè)定的值,即shuffle read階段每個分區(qū)來自buildIter的記錄要能放到內(nèi)存中4 streamIter的大小是buildIter三倍以上 復(fù)制代碼

4.2 shuffle Hash join 設(shè)計思路剖析(大表join小表)

  • 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關(guān)心哪個表為streamIter,哪個表為buildIter,這個spark會根據(jù)join語句自動幫我們完成。
  • 第二步: 將具有相同性質(zhì)的(如Hash值相同)join key 進行Shuffle到同一個分區(qū)。
  • 第三步:先把小表廣播到所有大表分區(qū)所在節(jié)點,然后根據(jù)buildIter Table的join key構(gòu)建Hash Table,把每一行記錄都存進HashTable
  • 第四步:掃描streamIter Table 每一行數(shù)據(jù),使用相同的hash函數(shù)匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起

5 Sort Merge join (橫行無敵)(大表join大表)

  • 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關(guān)心哪個表為streamIter,哪個表為buildIter,這個spark會根據(jù)join語句自動幫我們完成。
  • 第二步: 將具有相同性質(zhì)的(如Hash值相同)join key 進行Shuffle到同一個分區(qū)。
  • 第三步: 對streamIter 和 buildIter在shuffle read過程中先排序,join匹配時按順序查找,匹配結(jié)束后不必重頭開始,利用shuffle sort特性,查找性能解決了大表對大表的情形。

6 Spark Join 類型詳解

6.0 準備數(shù)據(jù)集( Justin => 左表有,Rose =>右表有)

學(xué)習(xí) Python中單引號,雙引號,3個單引號及3個雙引號的區(qū)別請參考:https://blog.csdn.net/woainishifu/article/details/76105667from pyspark.sql.types import * >>> rdd1 = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)] park.createDataFrame(rdd, schema) df.show()>>> schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) >>> df = spark.createDataFrame(rdd, schema) >>> df.show()+---+------+---+ | id| name|age| +---+------+---+ | 1| Alice| 18| | 2| Andy| 19| | 3| Bob| 17| | 4|Justin| 21| | 5| Cindy| 20| +---+------+---+>>> rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)]) show()>>> schema2 = StructType([ StructField("name", StringType(), True), StructField("height", IntegerType(), True) ]) >>> df2 = spark.createDataFrame(rdd2, schema2) >>> df2.show() +-----+------+ | name|height| +-----+------+ |Alice| 160| | Andy| 159| | Bob| 170| |Cindy| 165| | Rose| 160| +-----+------+ 復(fù)制代碼

6.1 inner join

  • inner join是一定要找到左右表中滿足join key 條件的記錄,join key都存在的情形。

    df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()df.join(df3, ["id", "name"], "inner").select(df.id, df.name,"age", "height").orderBy(df.id).show()df.join(df3, ["id", "name"], "inner").select(df.id, df['name'],"age", "height").orderBy(df.id).show()>>> df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()+---+-----+---+------+| id| name|age|height|+---+-----+---+------+| 1|Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 5|Cindy| 20| 165|+---+-----+---+------+ 復(fù)制代碼

6.2 left outer join

  • left outer join是以左表為準,在右表中查找匹配的記錄,如果查找失敗,左表行Row不變,右表一行Row中所有字段都為null的記錄。

  • 要求:左表是streamIter,右表是buildIter

    df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()+---+------+---+------+| id| name|age|height|+---+------+---+------+| 1| Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 4|Justin| 21| null|| 5| Cindy| 20| 165|+---+------+---+------+ 復(fù)制代碼

6.3 right outer join

  • right outer join是以右表為準,在左表中查找匹配的記錄,如果查找失敗,右表行Row不變,左表一行Row中所有字段都為null的記錄。

  • 要求:右表是streamIter,左表是buildIter

    df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()+----+-----+----+------+| id| name| age|height|+----+-----+----+------+|null| Rose|null| 160|| 1|Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 5|Cindy| 20| 165|+----+-----+----+------+ 復(fù)制代碼

6.4 full outer join

  • full outer join僅采用sort merge join實現(xiàn),左邊和右表既要作為streamIter,又要作為buildIter

  • 左表和右表已經(jīng)排好序,首先分別順序取出左表和右表中的一條記錄,比較key,如果key相等,則joinrowA和rowB,并將rowA和rowB分別更新到左表和右表的下一條記錄。

  • 如果keyA<keyB,說明右表中沒有與左表rowA對應(yīng)的記錄,那么joinrowA與nullRow。

  • 將rowA更新到左表的下一條記錄;如果keyA>keyB,則說明左表中沒有與右表rowB對應(yīng)的記錄,那么joinnullRow與rowB。

  • 將rowB更新到右表的下一條記錄。如此循環(huán)遍歷直到左表和右表的記錄全部處理完。

    >>> df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id").show()+----+------+----+------+| id| name| age|height|+----+------+----+------+|null| Rose|null| 160|| 1| Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 4|Justin| 21| null|| 5| Cindy| 20| 165|+----+------+----+------+ 復(fù)制代碼

6.5 left semi join

left semi join是以左表為準,在右表中查找匹配的記錄,如果查找成功,則僅返回左表Row的記錄,否則返回null。

6.6 left anti join

left anti join與left semi join相反,是以左表為準,在右表中查找匹配的記錄,如果查找成功,則返回null,否則僅返回左邊的記錄

6.6 row_number().over()

from pyspark.sql.types import * from pyspark.sql import Window from pyspark.sql.functions import * rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)]) schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("age", IntegerType(), True) ])df = spark.createDataFrame(rdd, schema) df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()+---+------+---+---+| id| name|age| rn|+---+------+---+---+| 1| Alice| 18| 1|| 1| Cindy| 20| 2|| 1|Justin| 21| 3|| 3| Bob| 17| 1|| 2| Andy| 19| 1|+---+------+---+---+df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()+---+------+---+---+| id| name|age| rn|+---+------+---+---+| 3| Bob| 17| 1|| 1| Alice| 18| 1|| 2| Andy| 19| 1|| 1| Cindy| 20| 2|| 1|Justin| 21| 3|+---+------+---+---+ 復(fù)制代碼

7 結(jié)語

一直想深入挖掘一下SparkSQL內(nèi)部join原理,終于有時間詳細的理一下 Shuffle Join 。作者還準備進一步研究Spark SQL 內(nèi)核原理,敬請期待我的Spark SQL源碼剖析系列。大數(shù)據(jù)商業(yè)實戰(zhàn)社區(qū)微信公眾號即將開啟,敬請關(guān)注,謝謝!

秦凱新 于深圳 201811200130

總結(jié)

以上是生活随笔為你收集整理的SparkSQL 之 Shuffle Join 内核原理及应用深度剖析-Spark商业源码实战的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。