日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Spark SQL 编程API入门系列之SparkSQL数据源

發布時間:2023/12/18 数据库 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark SQL 编程API入门系列之SparkSQL数据源 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

?

  不多說,直接上干貨!

?

?

?

?

SparkSQL數據源:從各種數據源創建DataFrame

  因為 spark sql,dataframe,datasets 都是共用 spark sql 這個庫的,三者共享同樣的代碼優化,生成以及執行流程,所以 sql,dataframe,datasets 的入口都是 sqlContext。

  可用于創建 spark dataframe 的數據源有很多:

?

?

?

?

?

SparkSQL數據源:RDD

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._// Define the schema using a case class.case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) sqlContext.createDataFrame(people)

?

?

?

?

?

SparkSQL數據源:Hive

  當從Hive 中讀取數據時,Spark SQL 支持任何Hive 支持的存儲格式(SerDe),包括文件、RCFiles、ORC、Parquet、Avro,以及Protocol Buffer(當然Spark SQL也可以直接讀取這些文件)。

  要連接已部署好的Hive,需要拷貝hive-site.xml、core-site.xml、hdfs-site.xml到Spark 的./conf/ 目錄下即可

  如果不想連接到已有的hive,可以什么都不做直接使用HiveContext:

  Spark SQL 會在當前的工作目錄中創建出自己的Hive 元數據倉庫,叫作metastore_db

  如果你嘗試使用HiveQL 中的CREATE TABLE(并非CREATE EXTERNAL TABLE)語句來創建表,這些表會被放在你默認的文件系統中的/user/hive/warehouse 目錄中(如果你的classpath 中有配好的hdfs-site.xml,默認的文件系統就是HDFS,否則就是本地文件系統)。

?

?

?

?

SparkSQL數據源:Hive讀寫

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL

sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

?

?

?

?

SparkSQL數據源:訪問不同版本的metastore

  從Spark1.4開始,Spark SQL可以通過修改配置去查詢不同版本的?Hive metastores(不用重新編譯)

?

?

?

?

?

?

?

?

SparkSQL數據源:Parquet

Parquet(http://parquet.apache.org/)是一種流行的列式存儲格式,可以高效地存儲具有嵌套字段的記錄。

Parquet 格式經常在Hadoop 生態圈中被使用,它也支持Spark SQL 的全部數據類型。Spark SQL 提供了直接讀取和存儲Parquet 格式文件的方法。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._// Define the schema using a case class.case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()people.write.parquet("xxxx")val parquetFile = sqlContext.read.parquet("people.parquet")//Parquet files can also be registered as tables and then used in SQL statements.parquetFile.registerTempTable("parquetFile")val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

?

?

?

?

?

?

?

SparkSQL數據源:Parquet-- Partition Discovery

  在Hive中通常會用分區表來優化性能,比如:

  

  SQLContext.read.parquet或者SQLContext.read.load只需要指定path/to/table,SparkSQL會自動從路徑中提取分區信息,返回的DataFrame?的schema?將是:

?

  當然你可以使用Hive讀取方式:

hiveContext.sql("FROM src SELECT key, value").

?

?

?

?

?

?

SparkSQL數據源:Json

  SparkSQL支持從Json文件或者Json格式的RDD讀取數據

  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 可以是目錄或者文件夾val path = "examples/src/main/resources/people.json"val people = sqlContext.read.json(path)// The inferred schema can be visualized using the printSchema() method. people.printSchema()// Register this DataFrame as a table.people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// Alternatively, a DataFrame can be created for a JSON dataset represented by// an RDD[String] storing one JSON object per string.val anotherPeopleRDD = sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

?

?

?

?

?

?

?

SparkSQL數據源:JDBC

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:postgresql:dbserver","dbtable" -> "schema.tablename")).load()

?

?

支持的參數:

?

?

?

轉載于:https://www.cnblogs.com/zlslch/p/6944860.html

總結

以上是生活随笔為你收集整理的Spark SQL 编程API入门系列之SparkSQL数据源的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。