spark sql定义RDD、DataFrame与DataSet
RDD
優(yōu)點:
- 編譯時類型安全
- 編譯時就能檢查出類型錯誤
- 面向?qū)ο蟮木幊田L(fēng)格
- 直接通過類名點的方式來操作數(shù)據(jù)
缺點:
- 序列化和反序列化的性能開銷
- 無論是集群間的通信, 還是IO操作都需要對對象的結(jié)構(gòu)和數(shù)據(jù)進行序列化和反序列化.
- GC的性能開銷
- 頻繁的創(chuàng)建和銷毀對象, 勢必會增加GC
DataFrame
DataFrame引入了schema和off-heap
-
schema : RDD每一行的數(shù)據(jù), 結(jié)構(gòu)都是一樣的. 這個結(jié)構(gòu)就存儲在schema中. Spark通過schame就能夠讀懂數(shù)據(jù), 因此在通信和IO時就只需要序列化和反序列化數(shù)據(jù), 而結(jié)構(gòu)的部分就可以省略了.
-
off-heap : 意味著JVM堆以外的內(nèi)存, 這些內(nèi)存直接受操作系統(tǒng)管理(而不是JVM)。Spark能夠以二進制的形式序列化數(shù)據(jù)(不包括結(jié)構(gòu))到off-heap中, 當要操作數(shù)據(jù)時, 就直接操作off-heap內(nèi)存. 由于Spark理解schema, 所以知道該如何操作.
off-heap就像地盤, schema就像地圖, Spark有地圖又有自己地盤了, 就可以自己說了算了, 不再受JVM的限制, 也就不再收GC的困擾了.
上圖直觀地體現(xiàn)了DataFrame和RDD的區(qū)別。左側(cè)的RDD[Person]雖然以Person為類型參數(shù),但Spark框架本身不了解Person類的內(nèi)部結(jié)構(gòu)。而右側(cè)的DataFrame卻提供了詳細的結(jié)構(gòu)信息,使得Spark SQL可以清楚地知道該數(shù)據(jù)集中包含哪些列,每列的名稱和類型各是什么。DataFrame多了數(shù)據(jù)的結(jié)構(gòu)信息,即schema。RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執(zhí)行效率、減少數(shù)據(jù)讀取以及執(zhí)行計劃的優(yōu)化,比如filter下推、裁剪等。
提升執(zhí)行效率
RDD API是函數(shù)式的,強調(diào)不變性,在大部分場景下傾向于創(chuàng)建新對象而不是修改老對象。這一特點雖然帶來了干凈整潔的API,卻也使得Spark應(yīng)用程序在運行期傾向于創(chuàng)建大量臨時對象,對GC造成壓力。在現(xiàn)有RDD API的基礎(chǔ)之上,我們固然可以利用mapPartitions方法來重載RDD單個分片內(nèi)的數(shù)據(jù)創(chuàng)建方式,用復(fù)用可變對象的方式來減小對象分配和GC的開銷,但這犧牲了代碼的可讀性,而且要求開發(fā)者對Spark運行時機制有一定的了解,門檻較高。另一方面,Spark SQL在框架內(nèi)部已經(jīng)在各種可能的情況下盡量重用對象,這樣做雖然在內(nèi)部會打破了不變性,但在將數(shù)據(jù)返回給用戶時,還會重新轉(zhuǎn)為不可變數(shù)據(jù)。利用 DataFrame API進行開發(fā),可以免費地享受到這些優(yōu)化效果。
減少數(shù)據(jù)讀取
分析大數(shù)據(jù),最快的方法就是 ——忽略它。這里的“忽略”并不是熟視無睹,而是根據(jù)查詢條件進行恰當?shù)募糁Α?/p>
上文討論分區(qū)表時提到的分區(qū)剪 枝便是其中一種——當查詢的過濾條件中涉及到分區(qū)列時,我們可以根據(jù)查詢條件剪掉肯定不包含目標數(shù)據(jù)的分區(qū)目錄,從而減少IO。
對于一些“智能”數(shù)據(jù)格 式,Spark SQL還可以根據(jù)數(shù)據(jù)文件中附帶的統(tǒng)計信息來進行剪枝。簡單來說,在這類數(shù)據(jù)格式中,數(shù)據(jù)是分段保存的,每段數(shù)據(jù)都帶有最大值、最小值、null值數(shù)量等 一些基本的統(tǒng)計信息。當統(tǒng)計信息表名某一數(shù)據(jù)段肯定不包括符合查詢條件的目標數(shù)據(jù)時,該數(shù)據(jù)段就可以直接跳過(例如某整數(shù)列a某段的最大值為100,而查詢條件要求a > 200)。
此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存儲格式的優(yōu)勢,僅掃描查詢真正涉及的列,忽略其余列的數(shù)據(jù)。
執(zhí)行優(yōu)化
為了說明查詢優(yōu)化,我們來看上圖展示的人口數(shù)據(jù)分析的示例。圖中構(gòu)造了兩個DataFrame,將它們join之后又做了一次filter操作。如果原封不動地執(zhí)行這個執(zhí)行計劃,最終的執(zhí)行效率是不高的。因為join是一個代價較大的操作,也可能會產(chǎn)生一個較大的數(shù)據(jù)集。如果我們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾后的較小的結(jié)果集,便可以有效縮短執(zhí)行時間。而Spark SQL的查詢優(yōu)化器正是這樣做的。簡而言之,邏輯查詢計劃優(yōu)化就是一個利用基于關(guān)系代數(shù)的等價變換,將高成本的操作替換為低成本操作的過程。
得到的優(yōu)化執(zhí)行計劃在轉(zhuǎn)換成物 理執(zhí)行計劃的過程中,還可以根據(jù)具體的數(shù)據(jù)源的特性將過濾條件下推至數(shù)據(jù)源內(nèi)。最右側(cè)的物理執(zhí)行計劃中Filter之所以消失不見,就是因為溶入了用于執(zhí)行最終的讀取操作的表掃描節(jié)點內(nèi)。
對于普通開發(fā)者而言,查詢優(yōu)化 器的意義在于,即便是經(jīng)驗并不豐富的程序員寫出的次優(yōu)的查詢,也可以被盡量轉(zhuǎn)換為高效的形式予以執(zhí)行。
此外,通過schema和off-heap, DataFrame解決了RDD的缺點, 但是卻丟了RDD的優(yōu)點. DataFrame不是類型安全的, API也不是面向?qū)ο箫L(fēng)格的.
import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext}object Run {def main(args: Array[String]) {val conf = new SparkConf().setAppName("test").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")val sqlContext = new SQLContext(sc)/*** id age* 1 30* 2 29* 3 21*/val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)// API不是面向?qū)ο蟮?/span>idAgeDF.filter(idAgeDF.col("age") > 25) // 不會報錯, DataFrame不是編譯時類型安全的idAgeDF.filter(idAgeDF.col("age") > "") } }DataSet
DataSet結(jié)合了RDD和DataFrame的優(yōu)點, 并帶來的一個新的概念Encoder
當序列化數(shù)據(jù)時, Encoder產(chǎn)生字節(jié)碼與off-heap進行交互, 能夠達到按需訪問數(shù)據(jù)的效果, 而不用反序列化整個對象. Spark還沒有提供自定義Encoder的API, 但是未來會加入.
下面看DataFrame和DataSet在2.0.0-preview中的實現(xiàn)
下面這段代碼, 在1.6.x中創(chuàng)建的是DataFrame:
// 上文DataFrame示例中提取出來的 val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)- 1
但是同樣的代碼在2.0.0-preview中, 創(chuàng)建的雖然還叫DataFrame:
// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的實現(xiàn), 返回值依然是DataFrame def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = { sparkSession.createDataFrame(rowRDD, schema) }但是其實卻是DataSet, 因為DataFrame被聲明為Dataset[Row]:
package object sql {// ...省略了不相關(guān)的代碼type DataFrame = Dataset[Row] }因此當我們從1.6.x遷移到2.0.0的時候, 無需任何修改就直接用上了DataSet.
總結(jié)
以上是生活随笔為你收集整理的spark sql定义RDD、DataFrame与DataSet的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: Scala入门到精通—— 第二节Scal