日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark创建DataFrame的三种方法

發(fā)布時間:2024/1/17 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark创建DataFrame的三种方法 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

跟關(guān)系數(shù)據(jù)庫的表(Table)一樣,DataFrame是Spark中對帶模式(schema)行列數(shù)據(jù)的抽象。DateFrame廣泛應(yīng)用于使用SQL處理大數(shù)據(jù)的各種場景。創(chuàng)建DataFrame有很多種方法,比如從本地List創(chuàng)建、從RDD創(chuàng)建或者從源數(shù)據(jù)創(chuàng)建,下面簡要介紹創(chuàng)建DataFrame的三種方法。

方法一,Spark中使用toDF函數(shù)創(chuàng)建DataFrame

通過導入(importing)Spark sql implicits, 就可以將本地序列(seq), 數(shù)組或者RDD轉(zhuǎn)為DataFrame。只要這些數(shù)據(jù)的內(nèi)容能指定數(shù)據(jù)類型即可。

本地seq + toDF創(chuàng)建DataFrame示例:

import sqlContext.implicits._ val df = Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")),(2, "Second Value", java.sql.Date.valueOf("2010-02-01")) ).toDF("int_column", "string_column", "date_column")

注意:如果直接用toDF()而不指定列名字,那么默認列名為"_1", "_2", ...

通過case class + toDF創(chuàng)建DataFrame的示例

// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._// Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people")// 使用 sqlContext 執(zhí)行 sql 語句. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// 注:sql()函數(shù)的執(zhí)行結(jié)果也是DataFrame,支持各種常用的RDD操作. // The columns of a row in the result can be accessed by ordinal. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

方法二,Spark中使用createDataFrame函數(shù)創(chuàng)建DataFrame

在SqlContext中使用createDataFrame也可以創(chuàng)建DataFrame。跟toDF一樣,這里創(chuàng)建DataFrame的數(shù)據(jù)形態(tài)也可以是本地數(shù)組或者RDD。

通過row+schema創(chuàng)建示例

import org.apache.spark.sql.types._ val schema = StructType(List(StructField("integer_column", IntegerType, nullable = false),StructField("string_column", StringType, nullable = true),StructField("date_column", DateType, nullable = true) ))val rdd = sc.parallelize(Seq(Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01")) )) val df = sqlContext.createDataFrame(rdd, schema)

方法三,通過文件直接創(chuàng)建DataFrame

使用parquet文件創(chuàng)建

val df = sqlContext.read.parquet("hdfs:/path/to/file")

使用json文件創(chuàng)建

val df = spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+

使用csv文件,spark2.0+之后的版本可用

//首先初始化一個SparkSession對象 val spark = org.apache.spark.sql.SparkSession.builder.master("local").appName("Spark CSV Reader").getOrCreate;//然后使用SparkSessions對象加載CSV成為DataFrame val df = spark.read.format("com.databricks.spark.csv").option("header", "true") //reading the headers.option("mode", "DROPMALFORMED").load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 apidf.show()

補充:spark數(shù)據(jù)集的演變:

總結(jié)

以上是生活随笔為你收集整理的Spark创建DataFrame的三种方法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。