日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Spark SQL(二)之DataSet操作

發布時間:2023/12/3 数据库 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark SQL(二)之DataSet操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、創建DataSet

使用SparkSession,應用程序可以從現有的RDD,Hive表的或Spark數據源創建DataFrame 。

(1)基于JSON的內容創建一個DataFrame

//hdfs Dataset<Row> df = spark.read().json("hdfs://master:9000/test.json");//rdd RDD<String> jsonRDD = ... Dataset<Row> df = spark.read().json(jsonRDD);//dataset Dataset<String> jsonDataset = ... Dataset<Row> df = spark.read().json(dataSet);

(2)基于parquet的內容創建一個DataFrame

//hdfs Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.parquet");

(3)基于orc的內容創建一個DataFrame

//hdfs Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.orc");

?(4)基于txt的內容創建一個DataFrame

//hdfs 創建只有value列的數據 Dataset<Row> df = spark.read().txt("hdfs://master:9000/test.txt");

(5)基于cvs的內容創建一個DataFrame

//hdfs Dataset<Row> df = spark.read().cvs("hdfs://master:9000/test.cvs");

?(6)基于jdbc的內容創建一個DataFrame

Dataset<Row> df1 = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/man").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "man").option("user", "root").option("password","admin").load(); df1.show();Properties properties = new Properties(); properties.put("user", "root"); properties.put("password","admin"); properties.put("driver", "com.mysql.jdbc.Driver"); Dataset<Row> df2 = spark.read().jdbc("jdbc:mysql://localhost:3306/man", "man", properties); df2.show();

(7)基于textFile的內容創建一個DataSet

//hdfs Dataset<String> ds = spark.read().textFile("hdfs://master:9000/test.txt");

(8)rdd創建DataSet

//反射推斷StructType JavaRDD<Person> peopleRDD = ... Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);//編程方式指定StructType String schemaString = ... List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field); } StructType schema = DataTypes.createStructType(fields); JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim()); }); Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

?

二、DataSet操作

(1)schema結構

df.printSchema(); StructType type = df.schema();

(2)map一對一映射操作

//dataframe格式轉換 Dataset<Row> df1 = df.map(v-> v, RowEncoder.apply(df.schema())); df1.show();//dataframe格式轉換 StructField structField = new StructField("name", DataTypes.StringType, true, null); StructType structType = new StructType(new StructField[]{structField}); Dataset<Row> df2 = df.map(v-> new GenericRowWithSchema(new Object[]{v.getAs("name")}, structType), RowEncoder.apply(structType)); df2.show();//dataSet格式轉換 Dataset<String> dfs = df.map(v-> v.getAs("name"), Encoders.STRING()); dfs.show();

(3)flatMap一對多映射操作

//dataSet格式轉換 Dataset<String> dfs = df.flatMap(v-> Arrays.asList((String)v.getAs("name")).iterator(), Encoders.STRING()); dfs.show();

(4)filter過濾操作

Dataset<Row> df1 = df.filter(new Column("name").$eq$eq$eq("mk")); Dataset<Row> df2 = df.filter(new Column("name").notEqual("mk"));

(5)withColumn加列或者覆蓋

Dataset<Row> df1 = df.withColumn("name1", functions.col("name")); df1.show(); Dataset<Row> df2 = df.withColumn("name", functions.lit("a")); df2.show(); Dataset<Row> df3 = df.withColumn("name", functions.concat(functions.col("name"), functions.lit("zzz"))); df3.show();

(6)select選擇列

Dataset<Row> df1 = df.select(functions.concat(functions.col("name"), functions.lit("zzz")).as("name1")); df1.show(); Dataset<Row> df2 = df.select(functions.col("name"), functions.concat(functions.col("name"), functions.lit("zzz")).as("name1")); df2.show();

(7)selectExpr表達式選擇列

Dataset<Row> df1 = df.selectExpr("name", "'a' as name1"); df1.show();

(8)groupBy agg分組統計

Dataset<Row> df1 = df.groupBy(functions.col("name")).agg(functions.expr("count(1)").as("c"), functions.expr("max(desc)").as("desc")); df1.show();

(9)drop刪除列

Dataset<Row> df1 = df.drop("name"); df1.show();

(10)distinct去重

Dataset<Row> df1 = df.distinct(); df1.show();

(11)dropDuplicates 根據字段去重

Dataset<Row> df1 = df.dropDuplicates("name"); df1.show();

(12)summary統計count、mean、stddev、min、max、25%、50%、75%,支持統計類型過濾

Dataset<Row> df1 = df.summary("count"); df1.show();

(13)describe統計count、mean、stddev、min、max,支持列過濾

Dataset<Row> df1 = df.describe("name"); df1.show();

(14)sort 排序

Dataset<Row> df1 = df.sort(functions.col("name").asc()); df1.show();

(15)limit 分頁

Dataset<Row> df1 = df.limit(1); df1.show();

?

三、DataSet連接

(1)join連接

Dataset<Row> df1 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")), "left_outer"); df1.show();Dataset<Row> df2 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name"))); df2.show();

(2)crossJoin笛卡爾連接

Dataset<Row> df1 = df.as("a").crossJoin(df.as("b")); df1.show();

?

四、DataSet集合運算

(1)except差集

Dataset<Row> df1 = df.except(df.filter("name='mk'")); df1.show();

(2)union并集,根據列位置合并行,列數要一致

Dataset<Row> df1 = df.union(df.filter("name='mk'")); df1.show();

(3)unionByName并集,根據列名合并行,不同名報錯,列數要一致

Dataset<Row> df1 = df.unionByName(df.filter("name='mk'")); df1.show();

(4)intersect交集

Dataset<Row> df1 = df.intersect(df.filter("name='mk'")); df1.show();

?

?

五、DataSet分區

repartition(numPartitions:Int):RDD[T]

coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

兩個都是RDD的分區進行重新劃分,repartition只是coalesce接口中shuffle為true的簡易實現

假設RDD有N個分區,需要重新劃分成M個分區

1、N<M。一般情況下N個分區有數據分布不均勻的狀況,利用HashPartitioner函數將數據重新分區為M個,這時需要將shuffle設置為true。

2、如果N>M并且N和M相差不多,(假如N是100,M是10)那么就可以將N個分區中的若干個分區合并成一個新的分區,最終合并為M個分區,這時可以將shuff設置為false。

在shuffl為false的情況下,如果M>N時,coalesce為無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關系。

3、如果N>M并且兩者相差懸殊,這時如果將shuffle設置為false,父子RDD是窄依賴關系,他們同處在一個Stage中,就可能造成spark程序的并行度不夠,從而影響性能。

如果在M為1的時候,為了使coalesce之前的操作有更好的并行度,可以講shuffle設置為true。

DataSet的coalesce是Repartition shuffle=false的簡寫方法

Dataset<Row> df1 = df.coalesce(1); Dataset<Row> df2 = df.repartition(1);


?

總結

以上是生活随笔為你收集整理的Spark SQL(二)之DataSet操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。