日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

再谈RDD、DataFrame、DataSet关系以及相互转换(JAVA API)

發布時間:2024/1/17 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 再谈RDD、DataFrame、DataSet关系以及相互转换(JAVA API) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark提供了三種主要的與數據相關的API:

  • RDD
  • DataFrame
  • DataSet

三者圖示

?

下面詳細介紹下各自的特點:

RDD

主要描述:RDD是Spark提供的最主要的一個抽象概念(Resilient Distributed Dataset),它是一個element的collection,分區化的位于集群的節點中,支持并行處理。

  • RDD的特性

    • 分布式:
      RDD使用MapReduce算子來廣泛的適應在集群中并行分布式的大數據集的處理和產生。并且方便用戶使用高級別的算子在并行計算中。
    • 不可變:
      RDD是由一個records的collection組成,而且是分區的。分區是RDD并行化的基礎單元,而且每個分區就是對數據的邏輯分割,它是不可變的,它是通過已經存在的分區的某些transformations創建得到。這種不可變性方便在計算中做到數據一致性。
    • 錯誤容忍:
      在實際中如果我們丟失了RDD的部分分區,可以通過對丟失分區關聯性的transformation重新計算得到。而不是在眾多節點中做數據的復制等操作。這個特性是RDD的最大優點,它節省了大量的數據管理、復制等操作,使得計算速度更快。
    • 惰性執行:
      所有的transformation都是惰性的,他們并不是立刻計算出結果,而是只是記住了各個transformation對數據集的依賴關系。當driver程序需要一個action結果時才開始執行。
    • 功能支持:
      RDD支持兩種類型的算子:transformation是指從已經存在的數據集中計算得到新的數據集;action是指通過對通過對數據集的計算得到一個結果返回給driver。
    • 數據格式:
      輕松且有效支持各種數據,包括結構化的和非結構化的。
    • 編程語言:
      RDD的API支持Scala、Java、Python和R
  • RDD的限制

    • 沒有內置的優化引擎
      當對結構化的數據進行處理時,RDD沒有使用Spark的高級優化器,比如catalyst優化器和Tungsten執行引擎。
    • 處理結構化的數據
      不像Dataframe或者Dataset,RDD不會主動推測出數據的schema,而是需要用戶在代碼里指示。

DataFrame

Spark從1.3版本開始引入Dataframe,它克服了RDD的最主要的挑戰。

主要描述:Dataframe是一個分布式的數據collection,而且將數據按照列名進行組織。在概念上它與關系型的數據庫的表或者R/Python語言中的DataFrame類似。與之一起提供的還有,Spark引入了catalyst優化器,它可以優化查詢。

  • DataFrame的特性

    • 分布式的Row對象的Collection:
      分布式、列名組織的數據、后臺優化。
      具體到代碼里面,Dataframe就是Dataset<Row>
    • 數據處理:
      處理支持結構或者非結構化的格式(比如Avro, CSV, elastic search, 以及Cassandra)以及不同的文件系統(HDFS, HIVE tables, MySQL, etc)。它支持非常多的數據源
    • 使用catalyst優化器優化:
      它對SQL查詢以及DataFrame API都提供優化支持。Dataframe使用catalyst樹transformation框架有四個步驟:
      1、Analyzing a logical plan to resolve references
      2、Logical plan optimization
      3、Physical planning
      4、Code generation to compile parts of the query to Java bytecode.
    • Hive兼容性:
      使用Spark的SQL可以無修改的支持Hive查詢在已經存在的Hive warehouses。它重用了Hive的前端、MetaStore并且對已經存在的Hive數據、查詢和UDF提供完整的兼容性。
    • Tungsten:
      Tungsten提供了一個物理執行后端,管理內存動態產生expression evaluation的字節碼
    • 編程語言:
      Dataframe API支持Scala、Java、Python和R
  • DataFrame的限制

    • 沒有編譯階段的類型檢查:
      不能在編譯時刻對安全性做出檢查,而且限制了用戶對于未知結構的數據進行操作。比如下面代碼在編譯時沒有錯誤,但是在執行時會出現異常:
    case class Person(name : String , age : Int) val dataframe = sqlContect.read.json("people.json") dataframe.filter("salary > 10000").show => throws Exception : cannot resolve 'salary' given input age , name
    • 不能保留類對象的結構:
      一旦把一個類結構的對象轉成了Dataframe,就不能轉回去了。下面這個栗子就是指出了:
    case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContect.createDataframe(personRDD) personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

DataSet

主要描述:Dataset API是對DataFrame的一個擴展,使得可以支持類型安全的檢查,并且對類結構的對象支持程序接口。它是強類型的,不可變collection,并映射成一個相關的schema。
Dataset API的核心是一個被稱為Encoder的概念。它是負責對JVM的對象以及表格化的表達(tabular representation)之間的相互轉化。
表格化的表達在存儲時使用了Spark內置的Tungsten二進制形式,允許對序列化數據操作并改進了內存使用。在Spark 1.6版本之后,支持自動化生成Encoder,可以對廣泛的primitive類型(比如String,Integer,Long等)、Scala的case class以及Java Bean自動生成對應的Encoder。

  • DataSet的特性

    • 支持RDD和Dataframe的優點:
      包括RDD的類型安全檢查,Dataframe的關系型模型,查詢優化,Tungsten執行,排序和shuffling。
    • Encoder:
      通過使用Encoder,用戶可以輕松轉換JVM對象到一個Dataset,允許用戶在結構化和非結構化的數據操作。
    • 編程語言:
      Scala和Java
    • 類型安全檢查:
      提供編譯階段的安全類型檢查。比如下面這個栗子:
    case class Person(name : String , age : Int) val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) val personDF = sqlContect.createDataframe(personRDD) val ds:Dataset[Person] = personDF.as[Person] ds.filter(p => p.age > 25) ds.filter(p => p.salary > 25)// error : value salary is not a member of person ds.rdd // returns RDD[Person]
    • 相互轉換:
      Dataset可以讓用戶輕松從RDD和Dataframe轉換到Dataset不需要額外太多代碼。
  • DataSet的限制

    • 需要把類型轉成String:
      Querying the data from datasets currently requires us to specify the fields in the class as a string. Once we have queried the data, we are forced to cast column to the required data type. On the other hand, if we use map operation on Datasets, it will not use Catalyst optimizer.
      比如:
    ds.select(col("name").as[String], $"age".as[Int]).collect()

Java API中三種數據格式的相互轉換

首先構造一個數據集,是由Person類的結構組成的,然后在此之上看這三個API實例的構造以及相互轉換

  • 數據創建

Person類的定義

數據創建

  • 直接構建出 JavaRDD<Person>

    JavaRDD<Person> personJavaRDD = jsc.parallelize(personList); System.out.println("1. 直接構建出 JavaRDD<Person>"); personJavaRDD.foreach(element -> System.out.println(element.toString()));

    Print結果:

    直接構建出 JavaRDD<Person>
    Person: name = Andy, age = 32
    Person: name = Michael, age = 23
    Person: name = Justin, age = 19

  • 直接構建出 Dataset<Person>

    Encoder<Person> personEncoder = Encoders.bean(Person.class);Dataset<Person> personDS = spark.createDataset(personList, personEncoder);System.out.println("2. 直接構建出 Dataset<Person>");personDS.show();personDS.printSchema();

    Print結果:

  • 直接構建出 Dataset<Person>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = false)
    |-- name: string (nullable = true)
  • 直接構建出 Dataset<Row>

    Dataset<Row> personDF = spark.createDataFrame(personList, Person.class);System.out.println("3. 直接構建出 Dataset<Row>");personDF.show();personDF.printSchema();

    Print結果:

  • 直接構建出 Dataset<Row>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = false)
    |-- name: string (nullable = true)
  • JavaRDD<Person> -> Dataset<Person>

    personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder);System.out.println("1->2 JavaRDD<Person> -> Dataset<Person>");personDS.show();personDS.printSchema();

    Print結果:

    1->2 JavaRDD<Person> -> Dataset<Person>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = true)
    |-- name: string (nullable = true)

  • JavaRDD<Person> -> Dataset<Row>

    personDF = spark.createDataFrame(personJavaRDD, Person.class);System.out.println("1->3 JavaRDD<Person> -> Dataset<Row>");personDF.show();personDF.printSchema();

    Print結果:

    1->3 JavaRDD<Person> -> Dataset<Row>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = false)
    |-- name: string (nullable = true)

  • 補充從JavaRDD<Row>到Dataset<Row>

    JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name));List<StructField> fieldList = new ArrayList<>();fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));StructType rowAgeNameSchema = DataTypes.createStructType(fieldList);personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema);System.out.println("\n\n\n補充,由JavaRDD<Row> -> Dataset<Row>");personDF.show();personDF.printSchema();

    主要就是使用RowFactory把Row中的每一項寫好后,通過spark的createDataFrame來創建。其中對于Row的解讀包含在了自建的StructType中。

  • Dataset<Person> -> JavaRDD<Person>

    personJavaRDD = personDS.toJavaRDD();System.out.println("2->1 Dataset<Person> -> JavaRDD<Person>");personJavaRDD.foreach(element -> System.out.println(element.toString()));

    Print結果:

    2->1 Dataset<Person> -> JavaRDD<Person>
    Person: name = Justin, age = 19
    Person: name = Andy, age = 32
    Person: name = Michael, age = 23

  • Dataset<Row> -> JavaRDD<Person>

    personJavaRDD = personDF.toJavaRDD().map(row -> {String name = row.getAs("name");int age = row.getAs("age");return new Person(name, age);});System.out.println("3->1 Dataset<Row> -> JavaRDD<Person>");personJavaRDD.foreach(element -> System.out.println(element.toString()));

    Print結果:

    3->1 Dataset<Row> -> JavaRDD<Person>
    Person: name = Justin, age = 19
    Person: name = Michael, age = 23
    Person: name = Andy, age = 32

  • Dataset<Person> -> Dataset<Row>

    List<StructField> fieldList = new ArrayList<>();fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));StructType rowSchema = DataTypes.createStructType(fieldList);ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);Dataset<Row> personDF_fromDS = personDS.map((MapFunction<Person, Row>) person -> {List<Object> objectList = new ArrayList<>();objectList.add(person.name);objectList.add(person.age);return RowFactory.create(objectList.toArray());},rowEncoder);System.out.println("2->3 Dataset<Person> -> Dataset<Row>");personDF_fromDS.show();personDF_fromDS.printSchema();

    Print結果:

    2->3 Dataset<Person> -> Dataset<Row>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = false)
    |-- name: string (nullable = true)

  • Dataset<Row> -> Dataset<Person>

    personDS = personDF.map(new MapFunction<Row, Person>() {@Overridepublic Person call(Row value) throws Exception {return new Person(value.getAs("name"), value.getAs("age"));}}, personEncoder);System.out.println("3->2 Dataset<Row> -> Dataset<Person>");personDS.show();personDS.printSchema();

    Print結果:

    3->2 Dataset<Row> -> Dataset<Person>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = true)
    |-- name: string (nullable = true)

總結:
其實RDD的Map和Dataset的Map只有一點不同,就是Dataset的Map要指定一個Encoder的參數。

需要用Encoder類給出



作者:shohokuooo
鏈接:https://www.jianshu.com/p/71003b152a84
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。

總結

以上是生活随笔為你收集整理的再谈RDD、DataFrame、DataSet关系以及相互转换(JAVA API)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。