Spark SQL(二)之DataSet操作
一、創建DataSet
使用SparkSession,應用程序可以從現有的RDD,Hive表的或Spark數據源創建DataFrame 。
(1)基于JSON的內容創建一個DataFrame
//hdfs Dataset<Row> df = spark.read().json("hdfs://master:9000/test.json");//rdd RDD<String> jsonRDD = ... Dataset<Row> df = spark.read().json(jsonRDD);//dataset Dataset<String> jsonDataset = ... Dataset<Row> df = spark.read().json(dataSet);(2)基于parquet的內容創建一個DataFrame
//hdfs Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.parquet");(3)基于orc的內容創建一個DataFrame
//hdfs Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.orc");?(4)基于txt的內容創建一個DataFrame
//hdfs 創建只有value列的數據 Dataset<Row> df = spark.read().txt("hdfs://master:9000/test.txt");(5)基于cvs的內容創建一個DataFrame
//hdfs Dataset<Row> df = spark.read().cvs("hdfs://master:9000/test.cvs");?(6)基于jdbc的內容創建一個DataFrame
Dataset<Row> df1 = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/man").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "man").option("user", "root").option("password","admin").load(); df1.show();Properties properties = new Properties(); properties.put("user", "root"); properties.put("password","admin"); properties.put("driver", "com.mysql.jdbc.Driver"); Dataset<Row> df2 = spark.read().jdbc("jdbc:mysql://localhost:3306/man", "man", properties); df2.show();(7)基于textFile的內容創建一個DataSet
//hdfs Dataset<String> ds = spark.read().textFile("hdfs://master:9000/test.txt");(8)rdd創建DataSet
//反射推斷StructType JavaRDD<Person> peopleRDD = ... Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);//編程方式指定StructType String schemaString = ... List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field); } StructType schema = DataTypes.createStructType(fields); JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim()); }); Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);?
二、DataSet操作
(1)schema結構
df.printSchema(); StructType type = df.schema();(2)map一對一映射操作
//dataframe格式轉換 Dataset<Row> df1 = df.map(v-> v, RowEncoder.apply(df.schema())); df1.show();//dataframe格式轉換 StructField structField = new StructField("name", DataTypes.StringType, true, null); StructType structType = new StructType(new StructField[]{structField}); Dataset<Row> df2 = df.map(v-> new GenericRowWithSchema(new Object[]{v.getAs("name")}, structType), RowEncoder.apply(structType)); df2.show();//dataSet格式轉換 Dataset<String> dfs = df.map(v-> v.getAs("name"), Encoders.STRING()); dfs.show();(3)flatMap一對多映射操作
//dataSet格式轉換 Dataset<String> dfs = df.flatMap(v-> Arrays.asList((String)v.getAs("name")).iterator(), Encoders.STRING()); dfs.show();(4)filter過濾操作
Dataset<Row> df1 = df.filter(new Column("name").$eq$eq$eq("mk")); Dataset<Row> df2 = df.filter(new Column("name").notEqual("mk"));(5)withColumn加列或者覆蓋
Dataset<Row> df1 = df.withColumn("name1", functions.col("name")); df1.show(); Dataset<Row> df2 = df.withColumn("name", functions.lit("a")); df2.show(); Dataset<Row> df3 = df.withColumn("name", functions.concat(functions.col("name"), functions.lit("zzz"))); df3.show();(6)select選擇列
Dataset<Row> df1 = df.select(functions.concat(functions.col("name"), functions.lit("zzz")).as("name1")); df1.show(); Dataset<Row> df2 = df.select(functions.col("name"), functions.concat(functions.col("name"), functions.lit("zzz")).as("name1")); df2.show();(7)selectExpr表達式選擇列
Dataset<Row> df1 = df.selectExpr("name", "'a' as name1"); df1.show();(8)groupBy agg分組統計
Dataset<Row> df1 = df.groupBy(functions.col("name")).agg(functions.expr("count(1)").as("c"), functions.expr("max(desc)").as("desc")); df1.show();(9)drop刪除列
Dataset<Row> df1 = df.drop("name"); df1.show();(10)distinct去重
Dataset<Row> df1 = df.distinct(); df1.show();(11)dropDuplicates 根據字段去重
Dataset<Row> df1 = df.dropDuplicates("name"); df1.show();(12)summary統計count、mean、stddev、min、max、25%、50%、75%,支持統計類型過濾
Dataset<Row> df1 = df.summary("count"); df1.show();(13)describe統計count、mean、stddev、min、max,支持列過濾
Dataset<Row> df1 = df.describe("name"); df1.show();(14)sort 排序
Dataset<Row> df1 = df.sort(functions.col("name").asc()); df1.show();(15)limit 分頁
Dataset<Row> df1 = df.limit(1); df1.show();?
三、DataSet連接
(1)join連接
Dataset<Row> df1 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")), "left_outer"); df1.show();Dataset<Row> df2 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name"))); df2.show();(2)crossJoin笛卡爾連接
Dataset<Row> df1 = df.as("a").crossJoin(df.as("b")); df1.show();?
四、DataSet集合運算
(1)except差集
Dataset<Row> df1 = df.except(df.filter("name='mk'")); df1.show();(2)union并集,根據列位置合并行,列數要一致
Dataset<Row> df1 = df.union(df.filter("name='mk'")); df1.show();(3)unionByName并集,根據列名合并行,不同名報錯,列數要一致
Dataset<Row> df1 = df.unionByName(df.filter("name='mk'")); df1.show();(4)intersect交集
Dataset<Row> df1 = df.intersect(df.filter("name='mk'")); df1.show();?
?
五、DataSet分區
repartition(numPartitions:Int):RDD[T]
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
兩個都是RDD的分區進行重新劃分,repartition只是coalesce接口中shuffle為true的簡易實現
假設RDD有N個分區,需要重新劃分成M個分區
1、N<M。一般情況下N個分區有數據分布不均勻的狀況,利用HashPartitioner函數將數據重新分區為M個,這時需要將shuffle設置為true。
2、如果N>M并且N和M相差不多,(假如N是100,M是10)那么就可以將N個分區中的若干個分區合并成一個新的分區,最終合并為M個分區,這時可以將shuff設置為false。
在shuffl為false的情況下,如果M>N時,coalesce為無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關系。
3、如果N>M并且兩者相差懸殊,這時如果將shuffle設置為false,父子RDD是窄依賴關系,他們同處在一個Stage中,就可能造成spark程序的并行度不夠,從而影響性能。
如果在M為1的時候,為了使coalesce之前的操作有更好的并行度,可以講shuffle設置為true。
DataSet的coalesce是Repartition shuffle=false的簡寫方法
Dataset<Row> df1 = df.coalesce(1); Dataset<Row> df2 = df.repartition(1);
?
總結
以上是生活随笔為你收集整理的Spark SQL(二)之DataSet操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 妹子用手机轻松直播K歌,四种模式支持混响
- 下一篇: Spark SQL(三)之视图与执行SQ