日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Spark SQL之RDD转DataFrame

發(fā)布時(shí)間:2024/9/16 数据库 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark SQL之RDD转DataFrame 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

準(zhǔn)備文件

首先準(zhǔn)備好測(cè)試文件info.txt,內(nèi)容如下:

1,vincent,20 2,sarah,19 3,sofia,29 4,monica,26

將RDD轉(zhuǎn)成DataFrame

方式一:反射

可以使用反射來推斷包含了特定數(shù)據(jù)類型的RDD的元數(shù)據(jù)
代碼如下:

package cn.ac.iie.sparkimport org.apache.spark.sql.SparkSession/*** DataFrame和RDD的互操作*/ object DataFrameRDDApp {def main(args: Array[String]): Unit = {val sparkSessionApp = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()// 將RDD轉(zhuǎn)成DataFrameval rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")// 注意需要導(dǎo)入隱式轉(zhuǎn)換import sparkSessionApp.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()sparkSessionApp.close()}case class Info(id:Int, name:String, age:Int){} }


當(dāng)?shù)玫紻ataFrame之后就可以進(jìn)行其他的相應(yīng)操作了,例如進(jìn)行過濾:infoDF.filter(infoDF.col("age") > 25).show():輸出如下:

隨后可以將DataFrame轉(zhuǎn)成一張表。
我們可以通過infoDF.createOrReplaceTempView("infos")注冊(cè)成一張表,好處就是可以直接通過SQL的方式進(jìn)行處理。

infoDF.createOrReplaceTempView("infos")sparkSessionApp.sql("select * from infos where age > 25").show()

方式二:編程方式

當(dāng)我們的Schema并不能提前定義時(shí),就需要這種方式來實(shí)現(xiàn)了。這種方式必須要遵從如下三個(gè)步驟:

  • 創(chuàng)建一個(gè)Rows的RDD
  • 定義一個(gè)Schema(使用StructType)
  • 使用createDataFrame將schema作用于Rows
  • 代碼試下如下:

    package cn.ac.iie.sparkimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession}/*** DataFrame和RDD的互操作*/ object DataFrameRDDApp {def main(args: Array[String]): Unit = {val sparkSessionApp = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()// infoReflection(sparkSessionApp)program(sparkSessionApp)sparkSessionApp.close()}private def program(sparkSessionApp: SparkSession) = {val rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))val structType = StructType(Array(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("age", IntegerType, true)))val infoDF = sparkSessionApp.createDataFrame(infoRDD, structType)infoDF.printSchema()infoDF.show()}private def infoReflection(sparkSessionApp: SparkSession) = {// 將RDD轉(zhuǎn)成DataFrameval rdd = sparkSessionApp.sparkContext.textFile("file:///E:/test/infos.txt")// 注意需要導(dǎo)入隱式轉(zhuǎn)換import sparkSessionApp.implicits._val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDF.show()infoDF.filter(infoDF.col("age") > 25).show()infoDF.createOrReplaceTempView("infos")sparkSessionApp.sql("select * from infos where age > 25").show()}case class Info(id:Int, name:String, age:Int){} }

    這種方式拿到DataFrame之后,依然可以進(jìn)行其他的相關(guān)API操作。

    兩種方式的優(yōu)缺點(diǎn)

    DataFrame和RDD互操作的兩種方式:

    反射:case class。

    這種方式事先需要知道你的字段、字段類型

    編程方式:Row

    如果第一種情況不能滿足要求,無法事先知道字段與類型
    優(yōu)先考慮第一種方式。因?yàn)閷?shí)現(xiàn)較為簡(jiǎn)單。

    總結(jié):DataFrame = RDD + Schema

    RDD僅僅知道里面裝的是什么對(duì)象(user),但是無法知道這個(gè)user里有哪些屬性,以及屬性的字段是什么類型的。所以我們直接處理RDD是有一定的困難,因此需要自己執(zhí)行Schema表結(jié)構(gòu),將Schema作用于RDD中,就可以看做是一個(gè)表了。接下來就可以方便的進(jìn)行操作了。
    同時(shí)DataFrame優(yōu)勢(shì):DataFrame底層使用了Catalyst進(jìn)行優(yōu)化。
    DataFrame還支持text、json、parquet以及其他外部數(shù)據(jù)源格式。將外部數(shù)據(jù)源的數(shù)據(jù)注冊(cè)到sparksql中,成為DataFrame,然后就可以使用DataFrame自身提供的API進(jìn)行操作了。或者可以注冊(cè)成一張表執(zhí)行sql語句。執(zhí)行自己的API或sql,最終形成的邏輯執(zhí)行計(jì)劃都是一樣的。

    總結(jié)

    以上是生活随笔為你收集整理的Spark SQL之RDD转DataFrame的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。