Spark _22 _创建DataFrame的几种方式(一)
生活随笔
收集整理的這篇文章主要介紹了
Spark _22 _创建DataFrame的几种方式(一)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
-
創建DataFrame的幾種方式
-
讀取json格式的文件創建DataFrame
注意:
- json文件中的json數據不能嵌套json格式數據。
- DataFrame是一個一個Row類型的RDD,df.rdd()/df.javaRdd()。
- 可以兩種方式讀取json格式的文件。
- df.show()默認顯示前20行數據。
- DataFrame原生API可以操作DataFrame(不方便)。
- 注冊成臨時表時,表中的列默認按ascii順序顯示列。
json文件:
{"name":"george","age":"22"} {"name":"lucy"} {"name":"honey","age":"20"} {"name":"KK","age":"20"}javaAPI:
package SparkSql;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext;/*** @author George* @description* 創建DataFrame的幾種方式* 1.讀取json格式的文件創建DataFrame**/ public class DFDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("DF");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");//創建sqlContextSQLContext sqlContext = new SQLContext(sc);/*** DataFrame的底層是一個一個的RDD RDD的泛型是Row類型。* 以下兩種方式都可以讀取json格式的文件*/ // Dataset<Row> df = sqlContext.read().format("json").load("./data/json");Dataset<Row> df = sqlContext.read().json("./data/json");df.show();/*** +----+------+* | age| name|* +----+------+* | 22|george|* |null| lucy|* | 20| honey|* | 20| KK|* +----+------+*///DataFrame轉換成RDDRDD<Row> rdd = df.rdd();/*** 顯示 DataFrame中的內容,默認顯示前20行。如果現實多行要指定多少行show(行數)* 注意:當有多個列時,顯示的列先后順序是按列的ascii碼先后顯示。*//*** 樹形的形式顯示schema信息*/df.printSchema();/*** root* |-- age: string (nullable = true)* |-- name: string (nullable = true)*//*** dataFram自帶的API 操作DataFrame*/ // select name from tabledf.select("name").show();/*** +------+* | name|* +------+* |george|* | lucy|* | honey|* | KK|* +------+*///select name,age+10 from table where age>19df.select(df.col("name"),df.col("age").plus(10).alias("addAge")).show();/***+------+------+* | name|addAge|* +------+------+* |george| 32.0|* | lucy| null|* | honey| 30.0|* | KK| 30.0|* +------+------+*///select name,age from table where age > 19df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();/*** +------+---+* | name|age|* +------+---+* |george| 22|* | honey| 20|* | KK| 20|* +------+---+*///select count(*) from table group by agedf.groupBy(df.col("age")).count().show();/***+----+-----+* | age|count|* +----+-----+* | 22| 1|* |null| 1|* | 20| 2|* +----+-----+*//*** 將DataFrame注冊成臨時的一張表,這張表臨時注冊到內存中,是邏輯上的表,不會霧化到磁盤*/df.registerTempTable("t1");Dataset<Row> sql1 = sqlContext.sql("select age,count(*) from t1 group by age");sql1.show();/*** +----+--------+* | age|count(1)|* +----+--------+* | 22| 1|* |null| 1|* | 20| 2|* +----+--------+*/Dataset<Row> sql2 = sqlContext.sql("select * from t1");sql2.show();/*** +----+------+* | age| name|* +----+------+* | 22|george|* |null| lucy|* | 20| honey|* | 20| KK|* +----+------+*/sc.stop();} }scalaAPI:
【友情補充】Spark2.3.0的DataFrame去哪了,DataSet是哪位?https://georgedage.blog.csdn.net/article/details/103072515
關于SQLContext過期,SparkSession登場https://georgedage.blog.csdn.net/article/details/102850878
package SparkSqlimport org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} import org.apache.spark.{SparkConf, SparkContext}/*** 創建DataFrame的幾種方式* 1.讀取json格式的文件創建DataFrame*/ object DFScalaDemo {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder.master("local").appName("www").getOrCreate() // val frame: DataFrame = sparkSession.read.format("json").load("./data/json")val frame = sparkSession.read.json("./data/json")frame.show()/*** +----+------+* | age| name|* +----+------+* | 22|george|* |null| lucy|* | 20| honey|* | 20| KK|* +----+------+*/frame.printSchema()/*** root* |-- age: string (nullable = true)* |-- name: string (nullable = true)*///select * from tableframe.select(frame.col("name")).show()//select name from table where age>19frame.select(frame.col("name"),frame.col("age")).where(frame.col("age").gt(19)).show()//select count(*) from table group by ageframe.groupBy(frame.col("age")).count().show()/*** +------+* | name|* +------+* |george|* | lucy|* | honey|* | KK|* +------+* +------+---+* | name|age|* +------+---+* |george| 22|* | honey| 20|* | KK| 20|* +------+---+* +----+-----+* | age|count|* +----+-----+* | 22| 1|* |null| 1|* | 20| 2|* +----+-----+*//*** 注冊臨時表*///【友情提示】registerTempTable在1.6.0后@deprecated("Use createOrReplaceTempView(viewName) instead.",frame.createOrReplaceTempView("t1")sparkSession.sql("select * from t1").show()/*** +----+------+* | age| name|* +----+------+* | 22|george|* |null| lucy|* | 20| honey|* | 20| KK|* +----+------+*/sparkSession.stop()} } val conf = new SparkConf() conf.setMaster("local").setAppName("jsonfile")val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.json("sparksql/json") //val df1 = sqlContext.read.format("json").load("sparksql/json")df.show() df.printSchema() //select * from table df.select(df.col("name")).show() //select name from table where age>19 df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show() //select count(*) from table group by age df.groupBy(df.col("age")).count().show();/*** 注冊臨時表*/ df.registerTempTable("jtable") val result = sqlContext.sql("select * from jtable") result.show() sc.stop()?
-
通過json格式的RDD創建DataFrame
javaAPI:
package SparkSql;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext;import java.util.Arrays;/*** @author George* @description* 2.通過json格式的RDD創建DataFrame**/ public class DFDemo2 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("www");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);//parallelize分發一個本地Scala集合來形成一個RDDJavaRDD<String> nameRDD = sc.parallelize(Arrays.asList("{\"name\":\"zhangsan\",\"age\":\"18\"}","{\"name\":\"lisi\",\"age\":\"19\"}","{\"name\":\"wangwu\",\"age\":\"20\"}"));JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList("{\"name\":\"zhangsan\",\"score\":\"100\"}","{\"name\":\"lisi\",\"score\":\"200\"}","{\"name\":\"wangwu\",\"score\":\"300\"}"));Dataset<Row> nameds = sqlContext.read().json(nameRDD);Dataset<Row> scoreds = sqlContext.read().json(scoreRDD);nameds.registerTempTable("name");scoreds.registerTempTable("score");Dataset<Row> res = sqlContext.sql("select * from name,score where name.name = score.name");res.show();/*** +---+--------+--------+-----+* |age| name| name|score|* +---+--------+--------+-----+* | 20| wangwu| wangwu| 300|* | 18|zhangsan|zhangsan| 100|* | 19| lisi| lisi| 200|* +---+--------+--------+-----+*/sc.stop();} }scalaAPI:
package SparkSqlimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{SQLContext, SparkSession}object DFScalaDemo2 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local").setAppName("jsonrdd")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)val nameRDD = sc.makeRDD(Array("{\"name\":\"zhangsan\",\"age\":18}","{\"name\":\"lisi\",\"age\":19}","{\"name\":\"wangwu\",\"age\":20}"))val scoreRDD = sc.makeRDD(Array("{\"name\":\"zhangsan\",\"score\":100}","{\"name\":\"lisi\",\"score\":200}","{\"name\":\"wangwu\",\"score\":300}"))val nameDF = sqlContext.read.json(nameRDD)val scoreDF = sqlContext.read.json(scoreRDD)nameDF.registerTempTable("name")scoreDF.registerTempTable("score")val result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name")result.show()/*** +--------+---+-----+* | name|age|score|* +--------+---+-----+* | wangwu| 20| 300|* |zhangsan| 18| 100|* | lisi| 19| 200|* +--------+---+-----+*/sc.stop()} }?
-
非json格式的RDD創建DataFrame
- 通過反射的方式將非json格式的RDD轉換成DataFrame(不建議使用)
Person類:
package SparkSql;import java.io.Serializable;/*** @author George* @description**/ public class Person implements Serializable {private int id;private String name;private int age;public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Person{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';} }javaAPI:
package SparkSql;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.SQLContext;import javax.sound.midi.SoundbankResource;/*** @author George* @description* 3.非json格式的RDD創建DataFrame**/ public class DFDemo3 {public static void main(String[] args) {/*** 注意:* 1.自定義類必須是可序列化的* 2.自定義類訪問級別必須是Public* 3.RDD轉成DataFrame會把自定義類中字段的名稱按assci碼排序*/SparkConf conf = new SparkConf();conf.setAppName("df");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> textFile = sc.textFile("./data/person.txt");JavaRDD<Person> map = textFile.map(new Function<String, Person>() {@Overridepublic Person call(String s) throws Exception {Person p = new Person();p.setId(Integer.parseInt(s.split(",")[0]));p.setName(s.split(",")[1]);p.setAge(Integer.parseInt(s.split(",")[2]));return p;}});map.foreach(new VoidFunction<Person>() {@Overridepublic void call(Person person) throws Exception {System.out.println(person);}});/*** Person{id=1, name='zhangsan', age=18}* Person{id=2, name='lisi', age=19}* Person{id=3, name='wangwu', age=20}*/sc.stop();} }scalaAPI:
package SparkSqlimport org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext}object DFScalaDemo3 {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("ww")val sc = new SparkContext(conf)sc.setLogLevel("error")val sqlContext: SQLContext = new SQLContext(sc)val tf = sc.textFile("./data/person.txt")/*** 將RDD隱式轉換成DataFrame*/import sqlContext.implicits._val map: RDD[Person] = tf.map { x => {val person = Person(Integer.parseInt(x.split(",")(0)), x.split(",")(1), Integer.valueOf(x.split(",")(2)))person}}val frame = map.toDF()frame.show()/*** +---+--------+---+* | id| name|age|* +---+--------+---+* | 1|zhangsan| 18|* | 2| lisi| 19|* | 3| wangwu| 20|* +---+--------+---+*//*** 將DataFrame轉換成PersonRDD*/val rdd = frame.rddval result = rdd.map(x => {Person(x.getAs("id"), x.getAs("name"), x.getAs("age"))})result.foreach(println)/*** Person(1,zhangsan,18)* Person(2,lisi,19)* Person(3,wangwu,20)*/sc.stop()} }- 動態創建Schema將非json格式的RDD轉換成DataFrame
javaAPI:
package SparkSql;import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hive.serde2.thrift.TReflectionUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.util.Arrays; import java.util.List;/*** @author George* @description* 2)動態創建Schema將非json格式的RDD轉換成DataFrame**/ public class DFDemo4 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("ww");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> tf = sc.textFile("./data/person.txt");/*** 轉換成Row類型的RDD*/JavaRDD<Row> map = tf.map(new Function<String, Row>() {@Overridepublic Row call(String v1) throws Exception {return RowFactory.create(Integer.parseInt(v1.split(",")[0]),String.valueOf(v1.split(",")[1]),Integer.valueOf(v1.split(",")[2]));}});/*** 動態構建DataFrame中的元數據,一般來說這里的字段可以來源自字符串,也可以來源于外部數據庫*/List<StructField> asList = Arrays.asList(DataTypes.createStructField("id", DataTypes.IntegerType, true),DataTypes.createStructField("name", DataTypes.StringType, true),DataTypes.createStructField("age", DataTypes.IntegerType, true));StructType schema = DataTypes.createStructType(asList);Dataset<Row> dataFrame = sqlContext.createDataFrame(map, schema);dataFrame.show();/***+---+--------+---+* | id| name|age|* +---+--------+---+* | 1|zhangsan| 18|* | 2| lisi| 19|* | 3| wangwu| 20|* +---+--------+---+*/sc.stop();} }scalaAPI:
package SparkSqlimport org.apache.spark.sql.{RowFactory, SQLContext} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext}object DFScalaDemo4 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("sww").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")val sqlContext = new SQLContext(sc)val tf = sc.textFile("./data/person.txt")val map = tf.map(x => {val strings = x.split(",")RowFactory.create(Integer.valueOf(strings(0)), strings(1), Integer.valueOf(strings(2)))})val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))val frame = sqlContext.createDataFrame(map,schema)frame.show()frame.printSchema()/*** +---+--------+---+* | id| name|age|* +---+--------+---+* | 1|zhangsan| 18|* | 2| lisi| 19|* | 3| wangwu| 20|* +---+--------+---+** root* |-- id: integer (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = true)*/sc.stop()} }后續還有,別走開。哈哈哈
總結
以上是生活随笔為你收集整理的Spark _22 _创建DataFrame的几种方式(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark _21 _SparkSQL介
- 下一篇: spark shuffle再补充