日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark的Dataset操作

發布時間:2024/1/17 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark的Dataset操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

列的選擇select

來個例子邊看邊說:

  • scala> val df = spark.createDataset(Seq(

  • ("aaa", 1, 2), ("bbb", 3, 4), ("ccc", 3, 5), ("bbb", 4, 6))

  • ).toDF("key1","key2","key3")

  • df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]

  • scala> df.printSchema

  • root

  • |-- key1: string (nullable = true)

  • |-- key2: integer (nullable = false)

  • |-- key3: integer (nullable = false)

  • scala> df.collect

  • res34: Array[org.apache.spark.sql.Row] = Array([aaa,1,2], [bbb,3,4], [ccc,3,5], [bbb,4,6])


  • 上面的代碼創建了一個DataFrame,有三列,列名分別是key1, key2, key3, 類型對應string, integer, integer。
    當前造了4條記錄,如上所示。

    ?

    ?

    接下來看看選擇列的幾種調用方式:

    ?

  • scala> df.select("key1").collect

  • res49: Array[org.apache.spark.sql.Row] = Array([aaa], [bbb], [ccc], [bbb])

  • ?
  • scala> df.select($"key1").collect

  • res50: Array[org.apache.spark.sql.Row] = Array([aaa], [bbb], [ccc], [bbb])

  • ?
  • scala> df.select(df.col("key1")).collect

  • res51: Array[org.apache.spark.sql.Row] = Array([aaa], [bbb], [ccc], [bbb])


  • select方法中參數直接用字符串只能用DataFrame中的命名字段名。不能對字段名再使用像SQL的select語法的表達式。但是$"key1"這種寫法對應的是select方法的Column參數類型重載,可以支持sql的select語法了:

    ?

    ?

  • scala> df.select(upper($"key1")).collect

  • res58: Array[org.apache.spark.sql.Row] = Array([AAA], [BBB], [CCC], [BBB])

  • ?
  • scala> df.select(upper("key1")).collect

  • <console>:27: error: type mismatch;

  • found : String("key1")

  • required: org.apache.spark.sql.Column

  • df.select(upper("key1")).collect


  • 上面在select中對字段key1調用了upper函數轉換大小寫,注意"key1"前面有個$符號,這個是scala最喜歡搞的語法糖,了解下寫代碼會很方便。而下面沒有加$符號在key1前面時就報錯了,提示需要的是Column,而當前給的則是個String類型。

    ?

    ?

    這時候的select也可以用selectExtr方法替換。比如下面的調用:

    ?

  • scala> df.selectExpr("upper(key1)", "key2 as haha2").show

  • +-----------+-----+

  • |upper(key1)|haha2|

  • +-----------+-----+

  • | AAA| 1|

  • | BBB| 3|

  • | CCC| 3|

  • | BBB| 4|

  • +-----------+-----+


  • key1字段調用了變大寫的函數,而key2字段改了別名haha2, ok, 一切順利!

    ?

    Where部分可以用filter函數和where函數。這倆函數的用法是一樣的,官網文檔里都說where是filter的別名。

    數據還是用上一篇里造的那個dataset:

    scala> val df = spark.createDataset(Seq(("aaa",1,2),("bbb",3,4),("ccc",3,5),("bbb",4, 6)) ).toDF("key1","key2","key3") df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    filter函數

    從Spark官網的文檔中看到,filter函數有下面幾種形式:

    def filter(func: (T) ? Boolean): Dataset[T] def filter(conditionExpr: String): Dataset[T] def filter(condition: Column): Dataset[T]
    • 1
    • 2
    • 3

    所以,以下幾種寫法都是可以的:

    scala> df.filter($"key1">"aaa").show +----+----+----+ |key1|key2|key3| +----+----+----+ | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+scala> df.filter($"key1"==="aaa").show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| +----+----+----+scala> df.filter("key1='aaa'").show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| +----+----+----+scala> df.filter("key2=1").show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| +----+----+----+scala> df.filter($"key2"===3).show +----+----+----+ |key1|key2|key3| +----+----+----+ | bbb| 3| 4| | ccc| 3| 5| +----+----+----+scala> df.filter($"key2"===$"key3"-1).show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| +----+----+----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    其中, ===是在Column類中定義的函數,對應的不等于是=!=
    $”列名”這個是語法糖,返回Column對象

    where函數

    scala> df.where("key1 = 'bbb'").show +----+----+----+ |key1|key2|key3| +----+----+----+ | bbb| 3| 4| | bbb| 4| 6| +----+----+----+scala> df.where($"key2"=!= 3).show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 4| 6| +----+----+----+scala> df.where($"key3">col("key2")).show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+scala> df.where($"key3">col("key2")+1).show +----+----+----+ |key1|key2|key3| +----+----+----+ | ccc| 3| 5| | bbb| 4| 6| +----+----+----+

    分組,聚合,排序

    scala> val df = spark.createDataset(Seq(("aaa",1,2),("bbb",3,4),("ccc",3,5),("bbb",4, 6)) ).toDF("key1","key2","key3") df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)scala> df.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    先來個最簡單的分組計數:

    /* 等價SQL: select key1, count(*) from table */ scala> df.groupBy("key1").count.show +----+-----+ |key1|count| +----+-----+ | ccc| 1| | aaa| 1| | bbb| 2| +----+-----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    注意,上面代碼中的count不是記錄數,而是對groupBy的聚合結果的計數。如果是要看分組后有多少條記錄,代碼如下。可以看到在這個示例數據集中結果應該是3條:

    /* 等價SQL: select distinct key1 from table */ scala> df.select("key1").distinct.show +----+ |key1| +----+ | ccc| | aaa| | bbb| +----+/* 等價SQL: select count(distinct key1) from table */ scala> df.select("key1").distinct.count res3: Long = 3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    上面的結果中,如果你跟我一樣有強迫癥的話,顯然應該注意到了key1的顯示沒有排序,不能忍。修改如下:

    /* 等價sql: select key1 , count(*) from table group by key1 order by key1 */scala> df.groupBy("key1").count.sort("key1").show +----+-----+ |key1|count| +----+-----+ | aaa| 1| | bbb| 2| | ccc| 1| +----+-----+/* 等價sql: select key1 , count(*) from table group by key1 order by key1 desc */scala> df.groupBy("key1").count.sort($"key1".desc).show +----+-----+ |key1|count| +----+-----+ | ccc| 1| | bbb| 2| | aaa| 1| +----+-----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    注意,上面一個是升序,一個是降序。和”select key1 , count(*) from table group by key1 order by key1 desc”降序的時候指定desc的時候,前面的key1跟了一個前綴,上一篇說過了,這個是col(column?name)的語法糖。以后的前綴,上一篇說過了,這個是col(column?name)的語法糖。以后的$前綴就不再解釋了。

    繼續完善下,之前默認是按照分組計數的大小的升序排列的。如果要按分組計數的大小的逆序排序要怎么做呢?看之前的show結果,計數列顯示的列名就是count。所以,自然就能想到下面的寫法:

    /* 等價sql: select key1 , count(*) from table group by key1 order by count(*) desc */ scala> df.groupBy("key1").count.sort($"count".desc).show +----+-----+ |key1|count| +----+-----+ | bbb| 2| | ccc| 1| | aaa| 1| +----+-----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    或者是用withColumnRenamed函數給列重命名:

    /* 等價sql: select key1 , count(*) as cnt from table group by key1 order by cnt desc */ scala> df.groupBy("key1").count.withColumnRenamed("count", "cnt").sort($"cnt".desc).show +----+---+ |key1|cnt| +----+---+ | bbb| 2| | aaa| 1| | ccc| 1| +----+---+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    更常用的方法是,直接給count(*)來個別名。如下:

    /* 等價sql: select key1 , count(*) as cnt from table group by key1 order by cnt desc */ scala> df.groupBy("key1").agg(count("key1").as("cnt")).show +----+---+ |key1|cnt| +----+---+ | ccc| 1| | aaa| 1| | bbb| 2| +----+---+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    嗯,我們看到這里引入了聚合函數agg。這函數通常是配合groupBy的,用法靈活。下面用幾個示例代碼直接上,注意區別Column類型參數和String類型參數:

    def agg(expr: Column, exprs: Column*): DataFrame def agg(exprs: Map[String, String]): DataFrame def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame/* 等價sql: select key1, count(key1), max(key2), avg(key3) from table group by key1 */ scala> df.groupBy("key1").agg(count("key1"), max("key2"), avg("key3")).show +----+-----------+---------+---------+ |key1|count(key1)|max(key2)|avg(key3)| +----+-----------+---------+---------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+-----------+---------+---------+scala> df.groupBy("key1").agg("key1"->"count", "key2"->"max", "key3"->"avg").show +----+-----------+---------+---------+ |key1|count(key1)|max(key2)|avg(key3)| +----+-----------+---------+---------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+-----------+---------+---------+scala> df.groupBy("key1").agg(Map(("key1","count"), ("key2","max"), ("key3","avg"))).show +----+-----------+---------+---------+ |key1|count(key1)|max(key2)|avg(key3)| +----+-----------+---------+---------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+-----------+---------+---------+scala> df.groupBy("key1").agg(("key1","count"), ("key2","max"), ("key3","avg")).show +----+-----------+---------+---------+ |key1|count(key1)|max(key2)|avg(key3)| +----+-----------+---------+---------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+-----------+---------+---------+/* 等價sql: select key1, count(key1) cnt, max(key2) max_key2, avg(key3) avg_key3 from table group by key1 order by key1, max_key2 desc */scala> df.groupBy("key1").agg(count("key1").as("cnt"), max("key2").as("max_key2"), avg("key3").as("avg_key3")).sort($"cnt",$"max_key2".desc).show +----+---+--------+--------+ |key1|cnt|max_key2|avg_key3| +----+---+--------+--------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+---+--------+--------+

    其他單表操作

    還有些雜七雜八的小用法沒有提到,比如添加列,刪除列,NA值處理之類的,就在這里大概列一下吧。

    數據集還是之前的那個吧:

    scala> val df = spark.createDataset(Seq(("aaa",1,2),("bbb",3,4),("ccc",3,5),("bbb",4, 6)) ).toDF("key1","key2","key3") df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)scala> df.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    下面來添加一列,可以是字符串類型,整型;可以是常量或者是對當前已有的某列的變換,都行:

    /* 新增字符串類型的列key_4,都初始化為new_str_col,注意這里的lit()函數 還有人發消息說這個lit(),補一下說明吧。這里的lit()是spark自帶的函數,需要import org.apache.spark.sql.functions def lit(literal: Any): Column Creates a Column of literal value. The passed in object is returned directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column also. Otherwise, a new Column is created to represent the literal value. Since 1.3.0 */ scala> val df_1 = df.withColumn("key4", lit("new_str_col")) df_1: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 2 more fields]scala> df_1.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key4: string (nullable = false)scala> df_1.show +----+----+----+-----------+ |key1|key2|key3| key4| +----+----+----+-----------+ | aaa| 1| 2|new_str_col| | bbb| 3| 4|new_str_col| | ccc| 3| 5|new_str_col| | bbb| 4| 6|new_str_col| +----+----+----+-----------+/* 同樣的,新增Int類型的列key5,都初始化為1024 */ scala> val df_2 = df_1.withColumn("key5", lit(1024)) df_2: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]scala> df_2.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key4: string (nullable = false)|-- key5: integer (nullable = false)scala> df_2.show +----+----+----+-----------+-----+ |key1|key2|key3| key4|key5| +----+----+----+-----------+-----+ | aaa| 1| 2|new_str_col| 1024| | bbb| 3| 4|new_str_col| 1024| | ccc| 3| 5|new_str_col| 1024| | bbb| 4| 6|new_str_col| 1024| +----+----+----+-----------+-----+/* 再來個不是常量的新增列key6 = key5 * 2 */ scala> val df_3 = df_2.withColumn("key6", $"key5"*2) df_3: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df_3.show +----+----+----+-----------+----+----+ |key1|key2|key3| key4|key5|key6| +----+----+----+-----------+----+----+ | aaa| 1| 2|new_str_col|1024|2048| | bbb| 3| 4|new_str_col|1024|2048| | ccc| 3| 5|new_str_col|1024|2048| | bbb| 4| 6|new_str_col|1024|2048| +----+----+----+-----------+----+----+/* 這次是用的expr()函數 */ scala> val df_4 = df_2.withColumn("key6", expr("key5 * 4")) df_4: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df_4.show +----+----+----+-----------+----+----+ |key1|key2|key3| key4|key5|key6| +----+----+----+-----------+----+----+ | aaa| 1| 2|new_str_col|1024|4096| | bbb| 3| 4|new_str_col|1024|4096| | ccc| 3| 5|new_str_col|1024|4096| | bbb| 4| 6|new_str_col|1024|4096| +----+----+----+-----------+----+----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    刪除列就比較簡單了,指定列名就好了

    /* 刪除列key5 */ scala> val df_5 = df_4.drop("key5") df_5: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]scala> df_4.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key4: string (nullable = false)|-- key5: integer (nullable = false)|-- key6: integer (nullable = false)scala> df_5.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key4: string (nullable = false)|-- key6: integer (nullable = false)scala> df_5.show +----+----+----+-----------+----+ |key1|key2|key3| key4|key6| +----+----+----+-----------+----+ | aaa| 1| 2|new_str_col|4096| | bbb| 3| 4|new_str_col|4096| | ccc| 3| 5|new_str_col|4096| | bbb| 4| 6|new_str_col|4096| +----+----+----+-----------+----+/* 可以一次刪除多列key4和key6 */ scala> val df_6 = df_5.drop("key4", "key6") df_6: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]/* 這里的columns函數以數組形式返回所有列名 */ scala> df_6.columns res23: Array[String] = Array(key1, key2, key3)scala> df_6.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    再寫幾個null值等無效數據的一些處理吧
    這次得換個數據集,null值的表用個csv文件導入,代碼如下:

    /* csv文件內容如下: key1,key2,key3,key4,key5 aaa,1,2,t1,4 bbb,5,3,t2,8 ccc,2,2,,7 ,7,3,t1, bbb,1,5,t3,0 ,4,,t1,8 */ scala> val df = spark.read.option("header","true").csv("natest.csv") df: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]scala> df.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| |null| 7| 3| t1|null| | bbb| 1| 5| t3| 0| | null| 4|null| t1| 8| +----+----+----+----+----+/* 把key1列中所有的null值替換成'xxx' */ scala> val df_2 = df.na.fill("xxx",Seq("key1")) df_2: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]scala> df_2.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| | xxx| 7| 3| t1|null| | bbb| 1| 5| t3| 0| | xxx| 4|null| t1| 8| +----+----+----+----+----+/* 一次修改相同類型的多個列的示例。 這里是把key3,key5列中所有的null值替換成1024。 csv導入時默認是string,如果是整型,寫法是一樣的,有各個類型的重載。 */ scala> val df_3 = df.na.fill("1024",Seq("key3","key5")) df_3: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]scala> df_3.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| |null| 7| 3| t1|1024| | bbb| 1| 5| t3| 0| |null| 4|1024| t1| 8| +----+----+----+----+----+/* 一次修改不同類型的多個列的示例。 csv導入時默認是string,如果是整型,寫法是一樣的,有各個類型的重載。 */ scala> val df_3 = df.na.fill(Map(("key1"->"yyy"),("key3","1024"),("key4","t88"),("key5","4096"))) df_3: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]scala> df_3.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2| t88| 7| | yyy| 7| 3| t1|4096| | bbb| 1| 5| t3| 0| | yyy| 4|1024| t1| 8| +----+----+----+----+----+/* 不修改,只是過濾掉含有null值的行。 這里是過濾掉key3,key5列中含有null的行 */ scala> val df_4 = df.na.drop(Seq("key3","key5")) df_4: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]scala> df_4.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| | bbb| 1| 5| t3| 0| +----+----+----+----+----+/* 過濾掉指定的若干列中,有效值少于n列的行 這里是過濾掉key1,key2,key3這3列中有效值小于2列的行。最后一行中,這3列有2列都是null,所以被過濾掉了。 */ scala> val df_5 = df.na.drop(2,Seq("key1","key2","key3")) df_5: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]scala> df.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| |null| 7| 3| t1|null| | bbb| 1| 5| t3| 0| |null| 4|null| t1| 8| +----+----+----+----+----+scala> df_5.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| |null| 7| 3| t1|null| | bbb| 1| 5| t3| 0| +----+----+----+----+----+/* 同上,如果不指定列名列表,則默認列名列表就是所有列 */ scala> val df_6 = df.na.drop(4) df_6: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]scala> df_6.show +----+----+----+----+----+ |key1|key2|key3|key4|key5| +----+----+----+----+----+ | aaa| 1| 2| t1| 4| | bbb| 5| 3| t2| 8| | ccc| 2| 2|null| 7| | bbb| 1| 5| t3| 0| +----+----+----+----+----+

    多表操作 join

    ?

    先看兩個源數據表的定義:

    scala> val df1 = spark.createDataset(Seq(("aaa", 1, 2), ("bbb", 3, 4), ("ccc", 3, 5), ("bbb", 4, 6)) ).toDF("key1","key2","key3") df1: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> val df2 = spark.createDataset(Seq(("aaa", 2, 2), ("bbb", 3, 5), ("ddd", 3, 5), ("bbb", 4, 6), ("eee", 1, 2), ("aaa", 1, 5), ("fff",5,6))).toDF("key1","key2","key4") df2: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df1.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)scala> df2.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key4: integer (nullable = false)scala> df1.show() +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+scala> df2.show() +----+----+----+ |key1|key2|key4| +----+----+----+ | aaa| 2| 2| | bbb| 3| 5| | ddd| 3| 5| | bbb| 4| 6| | eee| 1| 2| | aaa| 1| 5| | fff| 5| 6| +----+----+----+

    Spark對join的支持很豐富,等值連接,條件連接,自然連接都支持。連接類型包括內連接,外連接,左外連接,右外連接,左半連接以及笛卡爾連接。

    下面一一示例,先看內連接

    /* 內連接 select * from df1 join df2 on df1.key1=df2.key1 */ scala> val df3 = df1.join(df2,"key1") df3: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]scala> df3.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key2: integer (nullable = false)|-- key4: integer (nullable = false)scala> df3.show +----+----+----+----+----+ |key1|key2|key3|key2|key4| +----+----+----+----+----+ | aaa| 1| 2| 1| 5| | aaa| 1| 2| 2| 2| | bbb| 3| 4| 4| 6| | bbb| 3| 4| 3| 5| | bbb| 4| 6| 4| 6| | bbb| 4| 6| 3| 5| +----+----+----+----+----+/* 還是內連接,這次用joinWith。和join的區別是連接后的新Dataset的schema會不一樣,注意和上面的對比一下。 */ scala> val df4=df1.joinWith(df2,df1("key1")===df2("key1")) df4: org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, org.apache.spark.sql.Row)] = [_1: struct<key1: string, key2: int ... 1 more field>, _2: struct<key1: string, key2: int ... 1 more field>]scala> df4.printSchema root|-- _1: struct (nullable = false)| |-- key1: string (nullable = true)| |-- key2: integer (nullable = false)| |-- key3: integer (nullable = false)|-- _2: struct (nullable = false)| |-- key1: string (nullable = true)| |-- key2: integer (nullable = false)| |-- key4: integer (nullable = false)scala> df4.show +---------+---------+ | _1| _2| +---------+---------+ |[aaa,1,2]|[aaa,1,5]| |[aaa,1,2]|[aaa,2,2]| |[bbb,3,4]|[bbb,4,6]| |[bbb,3,4]|[bbb,3,5]| |[bbb,4,6]|[bbb,4,6]| |[bbb,4,6]|[bbb,3,5]| +---------+---------+

    然后是外連接:

    /* select * from df1 outer join df2 on df1.key1=df2.key1 */ scala> val df5 = df1.join(df2,df1("key1")===df2("key1"), "outer") df5: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df5.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ |null|null|null| ddd| 3| 5| | ccc| 3| 5|null|null|null| | aaa| 1| 2| aaa| 2| 2| | aaa| 1| 2| aaa| 1| 5| | bbb| 3| 4| bbb| 3| 5| | bbb| 3| 4| bbb| 4| 6| | bbb| 4| 6| bbb| 3| 5| | bbb| 4| 6| bbb| 4| 6| |null|null|null| fff| 5| 6| |null|null|null| eee| 1| 2| +----+----+----+----+----+----+

    下面是左外連接,右外連接和左半連接:

    /* 左外連接 */ scala> val df6 = df1.join(df2,df1("key1")===df2("key1"), "left_outer") df6: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df6.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 1| 5| | aaa| 1| 2| aaa| 2| 2| | bbb| 3| 4| bbb| 4| 6| | bbb| 3| 4| bbb| 3| 5| | ccc| 3| 5|null|null|null| | bbb| 4| 6| bbb| 4| 6| | bbb| 4| 6| bbb| 3| 5| +----+----+----+----+----+----+/* 右外連接 */ scala> val df7 = df1.join(df2,df1("key1")===df2("key1"), "right_outer") df7: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df7.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 2| 2| | bbb| 4| 6| bbb| 3| 5| | bbb| 3| 4| bbb| 3| 5| |null|null|null| ddd| 3| 5| | bbb| 4| 6| bbb| 4| 6| | bbb| 3| 4| bbb| 4| 6| |null|null|null| eee| 1| 2| | aaa| 1| 2| aaa| 1| 5| |null|null|null| fff| 5| 6| +----+----+----+----+----+----+/* 左半連接 */ scala> val df8 = df1.join(df2,df1("key1")===df2("key1"), "leftsemi") df8: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df8.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | bbb| 4| 6| +----+----+----+

    笛卡爾連接不太常用,畢竟現在用spark玩的表都大得很,做這種全連接成本太大了。

    /* 笛卡爾連接 */ scala> val df9 = df1.crossJoin(df2) df9: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df9.count res17: Long = 28/* 就顯示前10條結果吧 */ scala> df9.show(10) +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 2| 2| | aaa| 1| 2| bbb| 3| 5| | aaa| 1| 2| ddd| 3| 5| | aaa| 1| 2| bbb| 4| 6| | aaa| 1| 2| eee| 1| 2| | aaa| 1| 2| aaa| 1| 5| | aaa| 1| 2| fff| 5| 6| | bbb| 3| 4| aaa| 2| 2| | bbb| 3| 4| bbb| 3| 5| | bbb| 3| 4| ddd| 3| 5| +----+----+----+----+----+----+ only showing top 10 rows

    下面這個例子還是個等值連接,區別之前的等值連接是去調用兩個表的重復列,就像自然連接一樣:

    /* 基于兩個公共字段key1和key的等值連接 */ scala> val df10 = df1.join(df2, Seq("key1","key2")) df10: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 2 more fields]scala> df10.show +----+----+----+----+ |key1|key2|key3|key4| +----+----+----+----+ | aaa| 1| 2| 5| | bbb| 3| 4| 5| | bbb| 4| 6| 6| +----+----+----+----+

    條件連接在spark的低版本好像是不支持的,反正現在是ok啦~

    /* select df1.*,df2.* from df1 join df2 on df1.key1=df2.key1 and df1.key2>df2.key2 */ scala> val df11 = df1.join(df2, df1("key1")===df2("key1") && df1("key2")>df2("key2")) df11: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df11.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | bbb| 4| 6| bbb| 3| 5| +----+----+----+----+----+----+


    --------------------- 本文來自 野男孩 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/coding_hello/article/details/74853504?utm_source=copy

    總結

    以上是生活随笔為你收集整理的Spark的Dataset操作的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。