Spark SQL程序实现RDD转换DataFrame
生活随笔
收集整理的這篇文章主要介紹了
Spark SQL程序实现RDD转换DataFrame
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
通過反射推斷Schema
在Spark SQL中有兩種方式可以在DataFrame和RDD進行轉換
- 利用反射機制,推導包含某種類型的RDD,通過反射將其轉換為指定類型的DataFrame,適用于提前知道RDD的schema。
- 通過編程接口與RDD進行交互獲取schema,并動態創建DataFrame,在運行時決定列及其類型。
1、創建maven工程添加依賴
<properties><scala.version>2.11.8</scala.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.0.2</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.0.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>2、代碼實現
Scala支持使用case class類型導入RDD轉換為DataFrame,通過case class創建schema,case class的參數名稱會被反射讀取并成為表的列名。這種RDD可以高效的轉換為DataFrame并注冊為表。
package cn.cheng.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession}/*** RDD轉化成DataFrame:利用反射機制*/ //todo:定義一個樣例類Person case class Person(id:Int,name:String,age:Int) extends Serializableobject InferringSchema {def main(args: Array[String]): Unit = {//todo:1、構建sparkSession 指定appName和master的地址val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate()//todo:2、從sparkSession獲取sparkContext對象val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//設置日志輸出級別//todo:3、加載數據val dataRDD: RDD[String] = sc.textFile("D:\\person.txt")//todo:4、切分每一行記錄val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))//todo:5、將RDD與Person類關聯val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt))//todo:6、創建dataFrame,需要導入隱式轉換import spark.implicits._val personDF: DataFrame = personRDD.toDF()//todo-------------------DSL語法操作 start--------------//1、顯示DataFrame的數據,默認顯示20行personDF.show()//2、顯示DataFrame的schema信息personDF.printSchema()//3、顯示DataFrame記錄數println(personDF.count())//4、顯示DataFrame的所有字段personDF.columns.foreach(println)//5、取出DataFrame的第一行記錄println(personDF.head())//6、顯示DataFrame中name字段的所有值personDF.select("name").show()//7、過濾出DataFrame中年齡大于30的記錄personDF.filter($"age" > 30).show()//8、統計DataFrame中年齡大于30的人數println(personDF.filter($"age">30).count())//9、統計DataFrame中按照年齡進行分組,求每個組的人數personDF.groupBy("age").count().show()//todo-------------------DSL語法操作 end-------------//todo--------------------SQL操作風格 start-----------//todo:將DataFrame注冊成表personDF.createOrReplaceTempView("t_person")//todo:傳入sql語句,進行操作spark.sql("select * from t_person").show()spark.sql("select * from t_person where name='zhangsan'").show()spark.sql("select * from t_person order by age desc").show()//todo--------------------SQL操作風格 end-------------sc.stop()} }?
通過StructType直接指定Schema
1、當case class不能提前定義時,可以通過以下三步創建DataFrame
- 1、將RDD轉為包含row對象的RDD
- 1、基于structType類型創建schema,與第一步創建的RDD相匹配
- 2、通過sparkSession的createDataFrame方法對第一步的RDD應用?
schema創建DataFrame
2、代碼實現
maven依賴同Spark SQL程序實現RDD轉換DataFrame(一)
package cn.cheng.sqlimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession}/*** RDD轉換成DataFrame:通過指定schema構建DataFrame*/ object SparkSqlSchema {def main(args: Array[String]): Unit = {//todo:1、創建SparkSession,指定appName和masterval spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()//todo:2、獲取sparkContext對象val sc: SparkContext = spark.sparkContext//todo:3、加載數據val dataRDD: RDD[String] = sc.textFile("d:\\person.txt")//todo:4、切分每一行val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))//todo:5、加載數據到Row對象中val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))//todo:6、創建schemaval schema:StructType= StructType(Seq(StructField("id", IntegerType, false),StructField("name", StringType, false),StructField("age", IntegerType, false)))//todo:7、利用personRDD與schema創建DataFrameval personDF: DataFrame = spark.createDataFrame(personRDD,schema)//todo:8、DSL操作顯示DataFrame的數據結果personDF.show()//todo:9、將DataFrame注冊成表personDF.createOrReplaceTempView("t_person")//todo:10、sql語句操作spark.sql("select * from t_person").show()spark.sql("select count(*) from t_person").show()sc.stop()} }?
總結
以上是生活随笔為你收集整理的Spark SQL程序实现RDD转换DataFrame的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark SQL程序操作HiveCon
- 下一篇: linux cmake编译源码,linu