Spark SQL之RDD转DataFrame
準(zhǔn)備文件
首先準(zhǔn)備好測(cè)試文件info.txt,內(nèi)容如下:
1,vincent,20 2,sarah,19 3,sofia,29 4,monica,26將RDD轉(zhuǎn)成DataFrame
方式一:反射
可以使用反射來推斷包含了特定數(shù)據(jù)類型的RDD的元數(shù)據(jù)
代碼如下:
當(dāng)?shù)玫紻ataFrame之后就可以進(jìn)行其他的相應(yīng)操作了,例如進(jìn)行過濾:infoDF.filter(infoDF.col("age") > 25).show():輸出如下:
隨后可以將DataFrame轉(zhuǎn)成一張表。
我們可以通過infoDF.createOrReplaceTempView("infos")注冊(cè)成一張表,好處就是可以直接通過SQL的方式進(jìn)行處理。
方式二:編程方式
當(dāng)我們的Schema并不能提前定義時(shí),就需要這種方式來實(shí)現(xiàn)了。這種方式必須要遵從如下三個(gè)步驟:
代碼試下如下:
package cn.ac.iie.sparkimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession}/*** DataFrame和RDD的互操作*/ object DataFrameRDDApp {def main(args: Array[String]): Unit = {val sparkSessionApp = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()// infoReflection(sparkSessionApp)program(sparkSessionApp)sparkSessionApp.close()}private def program(sparkSessionApp: SparkSession) = {val rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))val structType = StructType(Array(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("age", IntegerType, true)))val infoDF = sparkSessionApp.createDataFrame(infoRDD, structType)infoDF.printSchema()infoDF.show()}private def infoReflection(sparkSessionApp: SparkSession) = {// 將RDD轉(zhuǎn)成DataFrameval rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")// 注意需要導(dǎo)入隱式轉(zhuǎn)換import sparkSessionApp.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()infoDF.filter(infoDF.col("age") > 25).show()infoDF.createOrReplaceTempView("infos")sparkSessionApp.sql("select * from infos where age > 25").show()}case class Info(id:Int, name:String, age:Int){} }這種方式拿到DataFrame之后,依然可以進(jìn)行其他的相關(guān)API操作。
兩種方式的優(yōu)缺點(diǎn)
DataFrame和RDD互操作的兩種方式:
反射:case class。
這種方式事先需要知道你的字段、字段類型
編程方式:Row
如果第一種情況不能滿足要求,無法事先知道字段與類型
優(yōu)先考慮第一種方式。因?yàn)閷?shí)現(xiàn)較為簡(jiǎn)單。
總結(jié):DataFrame = RDD + Schema
RDD僅僅知道里面裝的是什么對(duì)象(user),但是無法知道這個(gè)user里有哪些屬性,以及屬性的字段是什么類型的。所以我們直接處理RDD是有一定的困難,因此需要自己執(zhí)行Schema表結(jié)構(gòu),將Schema作用于RDD中,就可以看做是一個(gè)表了。接下來就可以方便的進(jìn)行操作了。
同時(shí)DataFrame優(yōu)勢(shì):DataFrame底層使用了Catalyst進(jìn)行優(yōu)化。
DataFrame還支持text、json、parquet以及其他外部數(shù)據(jù)源格式。將外部數(shù)據(jù)源的數(shù)據(jù)注冊(cè)到sparksql中,成為DataFrame,然后就可以使用DataFrame自身提供的API進(jìn)行操作了。或者可以注冊(cè)成一張表執(zhí)行sql語句。執(zhí)行自己的API或sql,最終形成的邏輯執(zhí)行計(jì)劃都是一樣的。
總結(jié)
以上是生活随笔為你收集整理的Spark SQL之RDD转DataFrame的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SparkSQL之DataFrame A
- 下一篇: linux cmake编译源码,linu