日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SparkSql官方文档中文翻译(java版本)

發布時間:2024/4/14 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkSql官方文档中文翻译(java版本) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
  • 1 概述(Overview)
  • 2 DataFrames
    • 2.1 入口:SQLContext(Starting Point: SQLContext)
    • 2.2 創建DataFrames(Creating DataFrames)
    • 2.3 DataFrame操作(DataFrame Operations)
    • 2.4 運行SQL查詢程序(Running SQL Queries Programmatically)
    • 2.5 DataFrames與RDDs的相互轉換(Interoperating with RDDs)
      • 2.5.1 使用反射獲取Schema(Inferring the Schema Using Reflection)
    • 2.5.2 通過編程接口指定Schema(Programmatically Specifying the Schema)
  • 3 數據源(Data Source)
    • 3.1 一般Load/Save方法
      • 3.1.1 手動指定選項(Manually Specifying Options)
      • 3.1.2 存儲模式(Save Modes)
      • 3.1.3 持久化到表(Saving to Persistent Tables)
    • 3.2 Parquet文件
      • 3.2.1 讀取Parquet文件(Loading Data Programmatically)
      • 3.2.2 解析分區信息(Partition Discovery)
      • 3.2.3 Schema合并(Schema Merging)
      • 3.2.4 Hive metastore Parquet表轉換(Hive metastore Parquet table conversion)
        • 3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)
        • 3.2.4.2 元數據刷新(Metadata Refreshing)
      • 3.2.5 配置(Configuration)
    • 3.3 JSON數據集
    • 3.4 Hive表
      • 3.4.1 訪問不同版本的Hive Metastore(Interacting with Different Versions of Hive Metastore)
    • 3.5 JDBC To Other Databases
    • 3.6 故障排除(Troubleshooting)
  • 4 性能調優
    • 4.1 緩存數據至內存(Caching Data In Memory)
    • 4.2 調優參數(Other Configuration Options)
  • 5 分布式SQL引擎
    • 5.1 運行Thrift JDBC/ODBC服務
    • 5.2 運行Spark SQL CLI
  • 6 Migration Guide
    • 6.1 與Hive的兼容(Compatibility with Apache Hive
      • 6.1.1 在Hive warehouse中部署Spark SQL
      • 6.1.2 Spark SQL支持的Hive特性
      • 6.1.3 不支持的Hive功能
  • 7 Reference
    • 7.1 Data Types
    • 7.2 NaN 語義

?

1 概述(Overview)

Spark SQL是Spark的一個組件,用于結構化數據的計算。Spark SQL提供了一個稱為DataFrames的編程抽象,DataFrames可以充當分布式SQL查詢引擎。

?

2 DataFrames

DataFrame是一個分布式的數據集合,該數據集合以命名列的方式進行整合。DataFrame可以理解為關系數據庫中的一張表,也可以理解為R/Python中的一個data frame。DataFrames可以通過多種數據構造,例如:結構化的數據文件、hive中的表、外部數據庫、Spark計算過程中生成的RDD等。
DataFrame的API支持4種語言:Scala、Java、Python、R。

?

2.1 入口:SQLContext(Starting Point: SQLContext)

Spark SQL程序的主入口是SQLContext類或它的子類。創建一個基本的SQLContext,你只需要SparkContext,創建代碼示例如下:

  • Scala
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

除了基本的SQLContext,也可以創建HiveContext。SQLContext和HiveContext區別與聯系為:

  • SQLContext現在只支持SQL語法解析器(SQL-92語法)
  • HiveContext現在支持SQL語法解析器和HiveSQL語法解析器,默認為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運行HiveSQL不支持的語法。
  • 使用HiveContext可以使用Hive的UDF,讀寫Hive表數據等Hive操作。SQLContext不可以對Hive進行操作。
  • Spark SQL未來的版本會不斷豐富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最終可能兩者會統一成一個Context

HiveContext包裝了Hive的依賴包,把HiveContext單獨拿出來,可以在部署基本的Spark的時候就不需要Hive的依賴包,需要使用HiveContext時再把Hive的各種依賴包加進來。

SQL的解析器可以通過配置spark.sql.dialect參數進行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中默認解析器為”hiveql“,也支持”sql“解析器。

?

2.2 創建DataFrames(Creating DataFrames)

使用SQLContext,spark應用程序(Application)可以通過RDD、Hive表、JSON格式數據等數據源創建DataFrames。下面是基于JSON文件創建DataFrame的示例:

  • Scala
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show()
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show();

?

2.3 DataFrame操作(DataFrame Operations)

DataFrames支持Scala、Java和Python的操作接口。下面是Scala和Java的幾個操作示例:

  • Scala
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1
  • Java
JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1

詳細的DataFrame API請參考?API Documentation。

除了簡單列引用和表達式,DataFrames還有豐富的library,功能包括string操作、date操作、常見數學操作等。詳細內容請參考?DataFrame Function Reference。

?

2.4 運行SQL查詢程序(Running SQL Queries Programmatically)

Spark Application可以使用SQLContext的sql()方法執行SQL查詢操作,sql()方法返回的查詢結果為DataFrame格式。代碼如下:

  • Scala
val sqlContext = ... // An existing SQLContext val df = sqlContext.sql("SELECT * FROM table")
  • Java
SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext.sql("SELECT * FROM table")

?

2.5 DataFrames與RDDs的相互轉換(Interoperating with RDDs)

Spark SQL支持兩種RDDs轉換為DataFrames的方式:

  • 使用反射獲取RDD內的Schema
    • 當已知類的Schema的時候,使用這種基于反射的方法會讓代碼更加簡潔而且效果也很好。
  • 通過編程接口指定Schema
    • 通過Spark SQL的接口創建RDD的Schema,這種方式會讓代碼比較冗長。
    • 這種方法的好處是,在運行時才知道數據的列以及列的類型的情況下,可以動態生成Schema

?

2.5.1 使用反射獲取Schema(Inferring the Schema Using Reflection)

Spark SQL支持將JavaBean的RDD自動轉換成DataFrame。通過反射獲取Bean的基本信息,依據Bean的信息定義Schema。當前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和復雜數據類型(如:List、Array)。創建一個實現Serializable接口包含所有屬性getters和setters的類來創建一個JavaBean。通過調用createDataFrame并提供JavaBean的Class object,指定一個Schema給一個RDD。示例如下:

public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();

?

2.5.2 通過編程接口指定Schema(Programmatically Specifying the Schema)

當JavaBean不能被預先定義的時候,編程創建DataFrame分為三步:

  • 從原來的RDD創建一個Row格式的RDD
  • 創建與RDD中Rows結構匹配的StructType,通過該StructType創建表示RDD的Schema
  • 通過SQLContext提供的createDataFrame方法創建DataFrame,方法參數為RDD的Schema

示例如下:

import org.apache.spark.api.java.function.Function; // Import factory methods provided by DataTypes. import org.apache.spark.sql.types.DataTypes; // Import StructType and StructField import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructField; // Import Row. import org.apache.spark.sql.Row; // Import RowFactory. import org.apache.spark.sql.RowFactory; // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt"); // The schema is encoded in a string String schemaString = "name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName: schemaString.split(" ")) { fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true)); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people.map( new Function<String, Row>() { public Row call(String record) throws Exception { String[] fields = record.split(","); return RowFactory.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();

?

3 數據源(Data Source)

Spark SQL的DataFrame接口支持多種數據源的操作。一個DataFrame可以進行RDDs方式的操作,也可以被注冊為臨時表。把DataFrame注冊為臨時表之后,就可以對該DataFrame執行SQL查詢。Data Sources這部分首先描述了對Spark的數據源執行加載和保存的常用方法,然后對內置數據源進行深入介紹。

?

3.1 一般Load/Save方法

Spark SQL的默認數據源為Parquet格式。數據源為Parquet文件時,Spark SQL可以方便的執行所有的操作。修改配置項spark.sql.sources.default,可修改默認數據源格式。讀取Parquet文件示例如下:

  • Scala
val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
  • Java
DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet"); df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

?

3.1.1 手動指定選項(Manually Specifying Options)

當數據源格式不是parquet格式文件時,需要手動指定數據源的格式。數據源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果數據源格式為內置格式,則只需要指定簡稱(json,parquet,jdbc)。通過指定的數據源格式名,可以對DataFrames進行類型轉換操作。示例如下:

  • Scala
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
  • Java
DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json"); df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

?

3.1.2 存儲模式(Save Modes)

可以采用SaveMode執行存儲操作,SaveMode定義了對數據的處理模式。需要注意的是,這些保存模式不使用任何鎖定,不是原子操作。此外,當使用Overwrite方式執行時,在輸出新數據之前原數據就已經被刪除。SaveMode詳細介紹如下表:

?

3.1.3 持久化到表(Saving to Persistent Tables)

當使用HiveContext時,可以通過saveAsTable方法將DataFrames存儲到表中。與registerTempTable方法不同的是,saveAsTable將DataFrame中的內容持久化到表中,并在HiveMetastore中存儲元數據。存儲一個DataFrame,可以使用SQLContext的table方法。table先創建一個表,方法參數為要創建的表的表名,然后將DataFrame持久化到這個表中。

默認的saveAsTable方法將創建一個“managed table”,表示數據的位置可以通過metastore獲得。當存儲數據的表被刪除時,managed table也將自動刪除。

?

3.2 Parquet文件

Parquet是一種支持多種數據處理系統的柱狀的數據格式,Parquet文件中保留了原始數據的模式。Spark SQL提供了Parquet文件的讀寫功能。

?

3.2.1 讀取Parquet文件(Loading Data Programmatically)

讀取Parquet文件示例如下:

  • Scala
// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
  • Java
// sqlContext from the previous example is used in this example.DataFrame schemaPeople = ... // The DataFrame from the previous example. // DataFrames can be saved as Parquet files, maintaining the schema information. schemaPeople.write().parquet("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a DataFrame. DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile"); DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();

?

3.2.2 解析分區信息(Partition Discovery)

對表進行分區是對數據進行優化的方式之一。在分區的表內,數據通過分區列將數據存儲在不同的目錄下。Parquet數據源現在能夠自動發現并解析分區信息。例如,對人口數據進行分區存儲,分區列為gender和country,使用下面的目錄結構:

path └── to└── table├── gender=male│ ├── ...│ ││ ├── country=US│ │ └── data.parquet│ ├── country=CN│ │ └── data.parquet│ └── ...└── gender=female├── ...│├── country=US│ └── data.parquet├── country=CN│ └── data.parquet└── ...

通過傳遞path/to/table給 SQLContext.read.parquet或SQLContext.read.load,Spark SQL將自動解析分區信息。返回的DataFrame的Schema如下:

root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)

需要注意的是,數據的分區列的數據類型是自動解析的。當前,支持數值類型和字符串類型。自動解析分區類型的參數為:spark.sql.sources.partitionColumnTypeInference.enabled,默認值為true。如果想關閉該功能,直接將該參數設置為disabled。此時,分區列數據格式將被默認設置為string類型,不再進行類型解析。

?

3.2.3 Schema合并(Schema Merging)

像ProtocolBuffer、Avro和Thrift那樣,Parquet也支持Schema evolution(Schema演變)。用戶可以先定義一個簡單的Schema,然后逐漸的向Schema中增加列描述。通過這種方式,用戶可以獲取多個有不同Schema但相互兼容的Parquet文件。現在Parquet數據源能自動檢測這種情況,并合并這些文件的schemas。

因為Schema合并是一個高消耗的操作,在大多數情況下并不需要,所以Spark SQL從1.5.0開始默認關閉了該功能。可以通過下面兩種方式開啟該功能:

  • 當數據源為Parquet文件時,將數據源選項mergeSchema設置為true
  • 設置全局SQL選項spark.sql.parquet.mergeSchema為true

示例如下:

  • Scala
// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)

?

3.2.4 Hive metastore Parquet表轉換(Hive metastore Parquet table conversion)

當向Hive metastore中讀寫Parquet表時,Spark SQL將使用Spark SQL自帶的Parquet SerDe(SerDe:Serialize/Deserilize的簡稱,目的是用于序列化和反序列化),而不是用Hive的SerDe,Spark SQL自帶的SerDe擁有更好的性能。這個優化的配置參數為spark.sql.hive.convertMetastoreParquet,默認值為開啟。

?

3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)

從表Schema處理的角度對比Hive和Parquet,有兩個區別:

  • Hive區分大小寫,Parquet不區分大小寫
  • hive允許所有的列為空,而Parquet不允許所有的列全為空

由于這兩個區別,當將Hive metastore Parquet表轉換為Spark SQL Parquet表時,需要將Hive metastore schema和Parquet schema進行一致化。一致化規則如下:

  • 這兩個schema中的同名字段必須具有相同的數據類型。一致化后的字段必須為Parquet的字段類型。這個規則同時也解決了空值的問題。
  • 一致化后的schema只包含Hive metastore中出現的字段。
    • 忽略只出現在Parquet schema中的字段
    • 只在Hive metastore schema中出現的字段設為nullable字段,并加到一致化后的schema中

?

3.2.4.2 元數據刷新(Metadata Refreshing)

Spark SQL緩存了Parquet元數據以達到良好的性能。當Hive metastore Parquet表轉換為enabled時,表修改后緩存的元數據并不能刷新。所以,當表被Hive或其它工具修改時,則必須手動刷新元數據,以保證元數據的一致性。示例如下:

  • Scala
// sqlContext is an existing HiveContext sqlContext.refreshTable("my_table")
  • Java
// sqlContext is an existing HiveContext sqlContext.refreshTable("my_table")

?

3.2.5 配置(Configuration)

配置Parquet可以使用SQLContext的setConf方法或使用SQL執行SET key=value命令。詳細參數說明如下:

?

3.3 JSON數據集

Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集為DataFrame格式。讀取JSON數據集方法為SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換為DataFrame。

需要注意的是,這里的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自滿足有效的JSON對象。如果用多行描述一個JSON對象,會導致讀取出錯。讀取JSON數據集示例如下:

  • Scala
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
  • Java
// sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method. people.printSchema(); // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people"); // SQL statements can be run by using the sql methods provided by sqlContext. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData); DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

?

3.4 Hive表

Spark SQL支持對Hive的讀寫操作。需要注意的是,Hive所依賴的包,沒有包含在Spark assembly包中。增加Hive時,需要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。這兩個配置將build一個新的assembly包,這個assembly包含了Hive的依賴包。注意,必須上這個心的assembly包到所有的worker節點上。因為worker節點在訪問Hive中數據時,會調用Hive的 serialization and deserialization libraries(SerDes),此時將用到Hive的依賴包。

Hive的配置文件為conf/目錄下的hive-site.xml文件。在YARN上執行查詢命令之前,lib_managed/jars目錄下的datanucleus包和conf/目錄下的hive-site.xml必須可以被driverhe和所有的executors所訪問。確保被訪問,最方便的方式就是在spark-submit命令中通過--jars選項和--file選項指定。

操作Hive時,必須創建一個HiveContext對象,HiveContext繼承了SQLContext,并增加了對MetaStore和HiveQL的支持。除了sql方法,HiveContext還提供了一個hql方法,hql方法可以執行HiveQL語法的查詢語句。示例如下:

  • Scala
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
  • Java
// sc is an existing JavaSparkContext. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc); sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();

?

3.4.1 訪問不同版本的Hive Metastore(Interacting with Different Versions of Hive Metastore)

Spark SQL經常需要訪問Hive metastore,Spark SQL可以通過Hive metastore獲取Hive表的元數據。從Spark 1.4.0開始,Spark SQL只需簡單的配置,就支持各版本Hive metastore的訪問。注意,涉及到metastore時Spar SQL忽略了Hive的版本。Spark SQL內部將Hive反編譯至Hive 1.2.1版本,Spark SQL的內部操作(serdes, UDFs, UDAFs, etc)都調用Hive 1.2.1版本的class。版本配置項見下面表格:

?

3.5 JDBC To Other Databases

Spark SQL支持使用JDBC訪問其他數據庫。當時用JDBC訪問其它數據庫時,最好使用JdbcRDD。使用JdbcRDD時,Spark SQL操作返回的DataFrame會很方便,也會很方便的添加其他數據源數據。JDBC數據源因為不需要用戶提供ClassTag,所以很適合使用Java或Python進行操作。
使用JDBC訪問數據源,需要在spark classpath添加JDBC driver配置。例如,從Spark Shell連接postgres的配置為:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

遠程數據庫的表,可用DataFrame或Spark SQL臨時表的方式調用數據源API。支持的參數有:

代碼示例如下:

  • Scala
val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
  • Java
Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:postgresql:dbserver"); options.put("dbtable", "schema.tablename"); DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();

?

3.6 故障排除(Troubleshooting)

  • 在客戶端session和所有的executors上,JDBC driver必須對啟動類加載器(primordial class loader)設置為visible。因為當創建一個connection時,Java的DriverManager類會執行安全驗證,安全驗證將忽略所有對啟動類加載器為非visible的driver。一個很方便的解決方法是,修改所有worker節點上的compute_classpath.sh腳本,將driver JARs添加至腳本。
  • 有些數據庫(例:H2)將所有的名字轉換為大寫,所以在這些數據庫中,Spark SQL也需要將名字全部大寫。

?

4 性能調優

?

4.1 緩存數據至內存(Caching Data In Memory)

Spark SQL可以通過調用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),將表用一種柱狀格式( an in-memory columnar format)緩存至內存中。然后Spark SQL在執行查詢任務時,只需掃描必需的列,從而以減少掃描數據量、提高性能。通過緩存數據,Spark SQL還可以自動調節壓縮,從而達到最小化內存使用率和降低GC壓力的目的。調用sqlContext.uncacheTable("tableName")可將緩存的數據移出內存。

可通過兩種配置方式開啟緩存數據功能:

  • 使用SQLContext的setConf方法
  • 執行SQL命令 SET key=value

?

4.2 調優參數(Other Configuration Options)

可以通過配置下表中的參數調節Spark SQL的性能。在后續的Spark版本中將逐漸增強自動調優功能,下表中的參數在后續的版本中或許將不再需要配置。

?

5 分布式SQL引擎

使用Spark SQL的JDBC/ODBC或者CLI,可以將Spark SQL作為一個分布式查詢引擎。終端用戶或應用不需要編寫額外的代碼,可以直接使用Spark SQL執行SQL查詢。

?

5.1 運行Thrift JDBC/ODBC服務

這里運行的Thrift JDBC/ODBC服務與Hive 1.2.1中的HiveServer2一致。可以在Spark目錄下執行如下命令來啟動JDBC/ODBC服務:

./sbin/start-thriftserver.sh

這個命令接收所有?bin/spark-submit?命令行參數,添加一個?--hiveconf?參數來指定Hive的屬性。詳細的參數說明請執行命令?./sbin/start-thriftserver.sh --help?。
服務默認監聽端口為localhost:10000。有兩種方式修改默認監聽端口:

  • 修改環境變量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...
  • 修改系統屬性
./sbin/start-thriftserver.sh \--hiveconf hive.server2.thrift.port=<listening-port> \--hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...

使用?beeline?來測試Thrift JDBC/ODBC服務:

./bin/beeline

連接到Thrift JDBC/ODBC服務

beeline> !connect jdbc:hive2://localhost:10000

在非安全模式下,只需要輸入機器上的一個用戶名即可,無需密碼。在安全模式下,beeline會要求輸入用戶名和密碼。安全模式下的詳細要求,請閱讀beeline documentation的說明。

配置Hive需要替換?conf/?目錄下的?hive-site.xml。

Thrift JDBC服務也支持通過HTTP傳輸發送thrift RPC messages。開啟HTTP模式需要將下面的配參數配置到系統屬性或?conf/:?下的?hive-site.xml中

hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice

測試http模式,可以使用beeline鏈接JDBC/ODBC服務:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

?

5.2 運行Spark SQL CLI

Spark SQL CLI可以很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。需要注意的是,Spark SQL CLI不能與Thrift JDBC服務交互。
在Spark目錄下執行如下命令啟動Spark SQL CLI:

./bin/spark-sql

配置Hive需要替換?conf/?下的?hive-site.xml?。執行?./bin/spark-sql --help?可查看詳細的參數說明 。

?

6 Migration Guide

?

6.1 與Hive的兼容(Compatibility with Apache Hive)

Spark SQL與Hive Metastore、SerDes、UDFs相兼容。Spark SQL兼容Hive Metastore從0.12到1.2.1的所有版本。Spark SQL也與Hive SerDes和UDFs相兼容,當前SerDes和UDFs是基于Hive 1.2.1。

?

6.1.1 在Hive warehouse中部署Spark SQL

Spark SQL Thrift JDBC服務與Hive相兼容,在已存在的Hive上部署Spark SQL Thrift服務不需要對已存在的Hive Metastore做任何修改,也不需要對數據做任何改動。

?

6.1.2 Spark SQL支持的Hive特性

Spark SQL支持多部分的Hive特性,例如:

  • Hive查詢語句,包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 所有Hive運算符,包括
    • 比較操作符(=, ?, ==, <>, <, >, >=, <=, etc)
    • 算術運算符(+, -, *, /, %, etc)
    • 邏輯運算符(AND, &&, OR, ||, etc)
    • 復雜類型構造器
    • 數學函數(sign,ln,cos,etc)
    • 字符串函數(instr,length,printf,etc)
  • 用戶自定義函數(UDF)
  • 用戶自定義聚合函數(UDAF)
  • 用戶自定義序列化格式器(SerDes)
  • 窗口函數
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 子查詢
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • 表分區,包括動態分區插入
  • 視圖
  • 所有的Hive DDL函數,包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 大部分的Hive數據類型,包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

?

6.1.3 不支持的Hive功能

下面是當前不支持的Hive特性,其中大部分特性在實際的Hive使用中很少用到。

Major Hive Features

  • Tables with buckets:bucket是在一個Hive表分區內進行hash分區。Spark SQL當前不支持。

Esoteric Hive Features

  • UNION type
  • Unique join
  • Column statistics collecting:當期Spark SQL不智齒列信息統計,只支持填充Hive Metastore的sizeInBytes列。

Hive Input/Output Formats

  • File format for CLI: 這個功能用于在CLI顯示返回結果,Spark SQL只支持TextOutputFormat
  • Hadoop archive

Hive優化
部分Hive優化還沒有添加到Spark中。沒有添加的Hive優化(比如索引)對Spark SQL這種in-memory計算模型來說不是特別重要。下列Hive優化將在后續Spark SQL版本中慢慢添加。

  • 塊級別位圖索引和虛擬列(用于建立索引)
  • 自動檢測joins和groupbys的reducer數量:當前Spark SQL中需要使用“?SET spark.sql.shuffle.partitions=[num_tasks];?”控制post-shuffle的并行度,不能自動檢測。
  • 僅元數據查詢:對于可以通過僅使用元數據就能完成的查詢,當前Spark SQL還是需要啟動任務來計算結果。
  • 數據傾斜標記:當前Spark SQL不遵循Hive中的數據傾斜標記
  • jion中STREAMTABLE提示:當前Spark SQL不遵循STREAMTABLE提示
  • 查詢結果為多個小文件時合并小文件:如果查詢結果包含多個小文件,Hive能合并小文件為幾個大文件,避免HDFS metadata溢出。當前Spark SQL不支持這個功能。

?

7 Reference

?

7.1 Data Types

Spark SQL和DataFrames支持的數據格式如下:

  • 數值類型
    • ByteType: 代表1字節有符號整數. 數值范圍: -128 到 127.
    • ShortType: 代表2字節有符號整數. 數值范圍: -32768 到 32767.
    • IntegerType: 代表4字節有符號整數. 數值范圍: -2147483648 t到 2147483647.
    • LongType: 代表8字節有符號整數. 數值范圍: -9223372036854775808 到 9223372036854775807.
    • FloatType: 代表4字節單精度浮點數。
    • DoubleType: 代表8字節雙精度浮點數。
    • DecimalType: 表示任意精度的有符號十進制數。內部使用java.math.BigDecimal.A實現。
    • BigDecimal由一個任意精度的整數非標度值和一個32位的整數組成。
  • String類型
    • StringType: 表示字符串值。
  • Binary類型
    • BinaryType: 代表字節序列值。
  • Boolean類型
    • BooleanType: 代表布爾值。
  • Datetime類型
    • TimestampType: 代表包含的年、月、日、時、分和秒的時間值
    • DateType: 代表包含的年、月、日的日期值
  • 復雜類型
    • ArrayType(elementType, containsNull): 代表包含一系列類型為elementType的元素。如果在一個將ArrayType值的元素可以為空值,containsNull指示是否允許為空。
    • MapType(keyType, valueType, valueContainsNull): 代表一系列鍵值對的集合。key不允許為空,valueContainsNull指示value是否允許為空
    • StructType(fields): 代表帶有一個StructFields(列)描述結構數據。
      • StructField(name, dataType, nullable): 表示StructType中的一個字段。name表示列名、dataType表示數據類型、nullable指示是否允許為空。

Spark SQL所有的數據類型在?org.apache.spark.sql.types?包內。不同語言訪問或創建數據類型方法不一樣:

  • Scala
    代碼中添加?import org.apache.spark.sql.types._,再進行數據類型訪問或創建操作。

  • Java
    可以使用?org.apache.spark.sql.types.DataTypes?中的工廠方法,如下表:

?

7.2 NaN 語義

當處理float或double類型時,如果類型不符合標準的浮點語義,則使用專門的處理方式NaN。需要注意的是:

    • NaN = NaN 返回 true
    • 可以對NaN值進行聚合操作
    • 在join操作中,key為NaN時,NaN值與普通的數值處理邏輯相同
    • NaN值大于所有的數值型數據,在升序排序中排在最后
    • 轉自:http://www.cnblogs.com/BYRans/

總結

以上是生活随笔為你收集整理的SparkSql官方文档中文翻译(java版本)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。