SparkSQL DataFrame进阶篇
生活随笔
收集整理的這篇文章主要介紹了
SparkSQL DataFrame进阶篇
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
SparkSQL DataFrame基礎(chǔ)篇
SparkSQL DataFrame進階篇
1.創(chuàng)建SparkSession【2.0】和 SQLContext實例【1.x】
1.創(chuàng)建SparkSession【2.0】 ///spark2.0后,用sparksession代替sparkcontext和sqlcontext的創(chuàng)建 val spark= SparkSession.builder().appName("SparkSQLTest").getOrCreate() val numbers=spark.range(1 ,10, 2) numbers.show()2.創(chuàng)建SQLContext實例【1.x】 val conf= new SparkConf().setAppName("SQL_Advanced_case").setMaster("local") val sc=new SparkContext(conf) val sqlContext=new SQLContext(sc) import.sqlContext.implicits._2.創(chuàng)建DataFrame
方式1:不創(chuàng)建RDD
使用createDataFram方法,直接基于列表List創(chuàng)建DataFrame.缺點是創(chuàng)建出來的DataFrame沒有列名
val customerData=List(("Alex","浙江",39,230.00), ("Bob","北京", 18, 170.00), ("Chris", "江蘇", 45, 529.95), ("Dave", "北京", 25, 99.99), ("Ellie", "浙江", 23, 1299.95), ("Fred", "北京", 21, 1099.00)) val customerDF1=sqlContext.createDataFrame(customerData) customerDF1.printSchema方式2:不創(chuàng)建RDD
使用createDataFram方法,直接基于列表List創(chuàng)建DataFrame.即便定義了樣例類,但基于createDataFrame創(chuàng)建出來的DataFrame缺點是創(chuàng)建出來的DataFrame沒有列名 val customerData=List(("Alex","浙江",39,230.00), ("Bob","北京", 18, 170.00), ("Chris", "江蘇", 45, 529.95), ("Dave", "北京", 25, 99.99), ("Ellie", "浙江", 23, 1299.95), ("Fred", "北京", 21, 1099.00)) case class CustomerInfo(customer:String, province:String, age: Int, total:Double ) val cdf=cDataRDD.map(x=>x.split(",")).map(x=>CustomerInfo(x(0),x(1),x(2).toInt,x(3).toDouble)).toDF val customerDF1=sqlContext.createDataFrame(customerData) customerDF1.printSchema方式3:傳統(tǒng)方式
先創(chuàng)建RDD和樣例類,然后通過toDF創(chuàng)建DataFrame。此時DataFrame中包含列信息。 val customerData=Array("Alex,浙江,39,230.00","Bob,北京,18,170.00","Chris,江蘇,45,529.95","Dave,北京,25,99.99","Ellie,浙江,23,1299.95","Fred,北京,21,1099.00") case class CustomerInfo(customer:String, province:String, age: Int, total:Double ) val customerRDD=sc.makeRDD(customerData) val customerDF1=customerRDD.map(x=>x.split(",")).map(x=>CustomerInfo(x(0),x(1),x(2).toInt,x(3).toDouble)).toDF customerDF1.printSchema其他
修改列名
val customerDF=customerDF1.withColumnRenamed("_1","customer").withColumnRenamed("_2","province").withColumnRenamed("_3", "age").withColumnRenamed("_4", "total")查看表模式
customerDF.printSchema數(shù)值型數(shù)據(jù)基本的統(tǒng)計與分析
customerDF.describe().show3.DataFrame方法
select
customerDF.select(customerDF.col("customer")).show customerDF.select(customerDF("customer")).show customerDF.select("customer", "province").show customerDF.select($"customer", $"province").show customerDF.select(col("customer"), col("province")).show customerDF.select(customerDF("customer"), col("province")).show customerDF.select("customer", $"province").show //錯誤,字符串與$不能混用。 customerDF.select(col("customer"), $"province").show //是否正確?正確!!使用表達式
(Column對象中的方法)
customerDF. select($"customer",( $"age"*2)+10, $"province"==="浙江").show //計算$"province"==="浙江"這一關(guān)系表達式的值as、alias列的重命名
customerDF. select($"customer" as "name",( $"age"*2)+10 alias "newAge", $"province"==="浙江" as "isZJ").showlit添加列
org.apache.spark.sql.functions中的方法lit val cdf1=customerDF.select($"customer", $"age", when($"age"<20,1).when($"age"<30, 2).otherwise(3) as "ageGroup", lit(false) as "trusted") cdf1.show cdf1.printSchemadrop
val cdf2=cdf1.drop("trusted") cdf2.show cdf2.printSchemadistinct
customerDF.select($"province").distinct.showfilter
customerDF.filter($"age">30).show customerDF.filter("age>30").show customerDF.filter($"age"<=30 and $"province"==="浙江" ) customerDF.filter("age<=30 and province =’浙江’")聚合操作
withColumn
向已有的DataFrame添加一個新列,不刪除之前的列
val customerAgeGroupDF=customerDF.withColumn("agegroup", when($"age"<20, 1).when($"age"<30, 2).otherwise(3)) customerAgeGroupDF.showgroupBy
操作返回GroupedData對象【2.0中為RelationalGroupedDataSet】
//其中封裝了大量聚合方法。 customerAgeGroupDF.groupBy("agegroup").max().show() customerAgeGroupDF.groupBy("agegroup","province").count().show() customerAgeGroupDF.groupBy("agegroup").min("age", "total").show()agg
customerAgeGroupDF.groupBy("agegroup").agg(sum($"total"), min($"total")).show()pivot
customerAgeGroupDF.groupBy("province").pivot("agegroup").sum("total").show() customerAgeGroupDF.groupBy("province").pivot("agegroup",Seq(1,2)). agg("total").show() customerAgeGroupDF.groupBy("province").pivot("agegroup",Seq(2,3)).agg(sum($"total"), min($"total")).filter($"provice"=!="北京").showsort
customerDF.orderBy("age").show customerDF.orderBy($"age").show customerDF.orderBy(desc("age")).show() /*此處orderBy方法一定要用在所有聚合函數(shù)之后,因為groupBy方法返回的是GroupedData類型數(shù)據(jù), 該類型數(shù)據(jù)中的聚合方法返回DateFrame類型對象,而orderBy是DataFrame中的方法,所以用在groupBy 之后會提示錯誤:orderBy不是GroupedData的成員方法。*/ customerAgeGroupDF.groupBy("agegroup","province").count().orderBy($"agegroup".desc).show() students.sort($"age".asc).show(5)SparkSQL DataFrame基礎(chǔ)篇
SparkSQL DataFrame進階篇
總結(jié)
以上是生活随笔為你收集整理的SparkSQL DataFrame进阶篇的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hive集成spark和mysql
- 下一篇: 没错,纯SQL查询语句可以实现神经网络