2021年大数据Spark(三十二):SparkSQL的External DataSource
?
目錄
External DataSource
數(shù)據(jù)源與格式
text 數(shù)據(jù)
json 數(shù)據(jù)
csv 數(shù)據(jù)
parquet 數(shù)據(jù)
jdbc 數(shù)據(jù)
???????加載/保存數(shù)據(jù)-API
???????Load 加載數(shù)據(jù)
???????Save 保存數(shù)據(jù)
???????保存模式(SaveMode)
???????案例演示
External DataSource
在SparkSQL模塊,提供一套完成API接口,用于方便讀寫外部數(shù)據(jù)源的的數(shù)據(jù)(從Spark 1.4版本提供),框架本身內(nèi)置外部數(shù)據(jù)源:
在Spark 2.4版本中添加支持Image Source(圖像數(shù)據(jù)源)和Avro Source。
數(shù)據(jù)源與格式
?????數(shù)據(jù)分析處理中,數(shù)據(jù)可以分為結(jié)構(gòu)化數(shù)據(jù)、非結(jié)構(gòu)化數(shù)據(jù)及半結(jié)構(gòu)化數(shù)據(jù)。
??1)、結(jié)構(gòu)化數(shù)據(jù)(Structured)
結(jié)構(gòu)化數(shù)據(jù)源可提供有效的存儲和性能。例如,Parquet和ORC等柱狀格式使從列的子集中提取值變得更加容易。
基于行的存儲格式(如Avro)可有效地序列化和存儲提供存儲優(yōu)勢的數(shù)據(jù)。然而,這些優(yōu)點通常以靈活性為代價。如因結(jié)構(gòu)的固定性,格式轉(zhuǎn)變可能相對困難。
?2)、非結(jié)構(gòu)化數(shù)據(jù)(UnStructured)
相比之下,非結(jié)構(gòu)化數(shù)據(jù)源通常是自由格式文本或二進制對象,其不包含標記或元數(shù)據(jù)以定義數(shù)據(jù)的結(jié)構(gòu)。
報紙文章,醫(yī)療記錄,圖像,應(yīng)用程序日志通常被視為非結(jié)構(gòu)化數(shù)據(jù)。這些類型的源通常要求數(shù)據(jù)周圍的上下文是可解析的。
?3)、半結(jié)構(gòu)化數(shù)據(jù)(Semi-Structured)
半結(jié)構(gòu)化數(shù)據(jù)源是按記錄構(gòu)建的,但不一定具有跨越所有記錄的明確定義的全局模式。每個數(shù)據(jù)記錄都使用其結(jié)構(gòu)信息進行擴充。
半結(jié)構(gòu)化數(shù)據(jù)格式的好處是,它們在表達數(shù)據(jù)時提供了最大的靈活性,因為每條記錄都是自我描述的。但這些格式的主要缺點是它們會產(chǎn)生額外的解析開銷,并且不是特別為ad-hoc(特定)查詢而構(gòu)建的。
text 數(shù)據(jù)
SparkSession加載文本文件數(shù)據(jù),提供兩種方法,返回值分別為DataFrame和Dataset,前面【W(wǎng)ordCount】中已經(jīng)使用,下面看一下方法聲明:
可以看出textFile方法底層還是調(diào)用text方法,先加載數(shù)據(jù)封裝到DataFrame中,再使用as[String]方法將DataFrame轉(zhuǎn)換為Dataset,實際中推薦使用textFile方法,從Spark 2.0開始提供。
無論是text方法還是textFile方法讀取文本數(shù)據(jù)時,一行一行的加載數(shù)據(jù),每行數(shù)據(jù)使用UTF-8編碼的字符串,列名稱為【value】。?
json 數(shù)據(jù)
實際項目中,有時處理數(shù)據(jù)以JSON格式存儲的,尤其后續(xù)結(jié)構(gòu)化流式模塊:StructuredStreaming,從Kafka Topic消費數(shù)據(jù)很多時間是JSON個數(shù)據(jù),封裝到DataFrame中,需要解析提取字段的值。以讀取github操作日志JSON數(shù)據(jù)為例,數(shù)據(jù)結(jié)構(gòu)如下:
?1)、操作日志數(shù)據(jù)使用GZ壓縮:2015-03-01-11.json.gz,先使用json方法讀取。
?2)、使用textFile加載數(shù)據(jù),對每條JSON格式字符串數(shù)據(jù),使用SparkSQL函數(shù)庫functions中自帶get_json_obejct函數(shù)提取字段:id、type、public和created_at的值。
函數(shù):get_json_obejct使用說明
示例代碼:
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** SparkSQL讀取JSON格式文本數(shù)據(jù)*/
object SparkSQLJson {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 從LocalFS上讀取json格式數(shù)據(jù)(壓縮)val jsonDF: DataFrame = spark.read.json("data/input/2015-03-01-11.json.gz")//jsonDF.printSchema()jsonDF.show(5, truncate = true)println("===================================================")val githubDS: Dataset[String] = spark.read.textFile("data/input/2015-03-01-11.json.gz")//githubDS.printSchema() // value 字段名稱,類型就是StringgithubDS.show(5,truncate = true)// TODO:使用SparkSQL自帶函數(shù),針對JSON格式數(shù)據(jù)解析的函數(shù)import org.apache.spark.sql.functions._// 獲取如下四個字段的值:id、type、public和created_atval gitDF: DataFrame = githubDS.select(get_json_object($"value", "$.id").as("id"),get_json_object($"value", "$.type").as("type"),get_json_object($"value", "$.public").as("public"),get_json_object($"value", "$.created_at").as("created_at"))gitDF.printSchema()gitDF.show(10, truncate = false)// 應(yīng)用結(jié)束,關(guān)閉資源spark.stop()}
}
運行結(jié)果:
???????csv 數(shù)據(jù)
在機器學習中,常常使用的數(shù)據(jù)存儲在csv/tsv文件格式中,所以SparkSQL中也支持直接讀取格式數(shù)據(jù),從2.0版本開始內(nèi)置數(shù)據(jù)源。關(guān)于CSV/TSV格式數(shù)據(jù)說明:
SparkSQL中讀取CSV格式數(shù)據(jù),可以設(shè)置一些選項,重點選項:
?1)、分隔符:sep
默認值為逗號,必須單個字符
?2)、數(shù)據(jù)文件首行是否是列名稱:header
默認值為false,如果數(shù)據(jù)文件首行是列名稱,設(shè)置為true
?3)、是否自動推斷每個列的數(shù)據(jù)類型:inferSchema
默認值為false,可以設(shè)置為true
官方提供案例:
當讀取CSV/TSV格式數(shù)據(jù)文件首行是否是列名稱,讀取數(shù)據(jù)方式(參數(shù)設(shè)置)不一樣的 。
?第一點:首行是列的名稱,如下方式讀取數(shù)據(jù)文件
???????//?TODO:?讀取TSV格式數(shù)據(jù)val?ratingsDF:?DataFrame?=?spark.read//?設(shè)置每行數(shù)據(jù)各個字段之間的分隔符,?默認值為?逗號.option("sep",?"\t")//?設(shè)置數(shù)據(jù)文件首行為列名稱,默認值為?false.option("header",?"true")//?自動推薦數(shù)據(jù)類型,默認值為false.option("inferSchema",?"true")//?指定文件的路徑.csv("datas/ml-100k/u.dat")ratingsDF.printSchema()ratingsDF.show(10,?truncate?=?false)
?第二點:首行不是列的名稱,如下方式讀取數(shù)據(jù)(設(shè)置Schema信息)
??????//?定義Schema信息val?schema?=?StructType(StructField("user_id",?IntegerType,?nullable?=?true)?::StructField("movie_id",?IntegerType,?nullable?=?true)?::StructField("rating",?DoubleType,?nullable?=?true)?::StructField("timestamp",?StringType,?nullable?=?true)?::?Nil)//?TODO:?讀取TSV格式數(shù)據(jù)val?mlRatingsDF:?DataFrame?=?spark.read//?設(shè)置每行數(shù)據(jù)各個字段之間的分隔符,?默認值為?逗號.option("sep",?"\t")//?指定Schema信息.schema(schema)//?指定文件的路徑.csv("datas/ml-100k/u.data")mlRatingsDF.printSchema()mlRatingsDF.show(5,?truncate?=?false)
?????將DataFrame數(shù)據(jù)保存至CSV格式文件,演示代碼如下:
示例代碼??
??????/***?將電影評分數(shù)據(jù)保存為CSV格式數(shù)據(jù)*/mlRatingsDF//?降低分區(qū)數(shù),此處設(shè)置為1,將所有數(shù)據(jù)保存到一個文件中.coalesce(1).write//?設(shè)置保存模式,依據(jù)實際業(yè)務(wù)場景選擇,此處為覆寫.mode(SaveMode.Overwrite).option("sep",?",")//?TODO:?建議設(shè)置首行為列名.option("header",?"true").csv("datas/ml-csv-"?+?System.nanoTime())
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** SparkSQL 讀取CSV/TSV格式數(shù)據(jù):* i). 指定Schema信息* ii). 是否有header設(shè)置*/
object SparkSQLCsv {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._/*** 實際企業(yè)數(shù)據(jù)分析中* csv\tsv格式數(shù)據(jù),每個文件的第一行(head, 首行),字段的名稱(列名)*/// TODO: 讀取CSV格式數(shù)據(jù)val ratingsDF: DataFrame = spark.read// 設(shè)置每行數(shù)據(jù)各個字段之間的分隔符, 默認值為 逗號.option("sep", "\t")// 設(shè)置數(shù)據(jù)文件首行為列名稱,默認值為?false.option("header", "true")// 自動推薦數(shù)據(jù)類型,默認值為false.option("inferSchema", "true")// 指定文件的路徑.csv("data/input/rating_100k_with_head.data")ratingsDF.printSchema()ratingsDF.show(10, truncate = false)println("=======================================================")// 定義Schema信息val schema = StructType(StructField("user_id", IntegerType, nullable = true) ::StructField("movie_id", IntegerType, nullable = true) ::StructField("rating", DoubleType, nullable = true) ::StructField("timestamp", StringType, nullable = true) :: Nil)// TODO: 讀取CSV格式數(shù)據(jù)val mlRatingsDF: DataFrame = spark.read// 設(shè)置每行數(shù)據(jù)各個字段之間的分隔符, 默認值為 逗號.option("sep", "\t")// 指定Schema信息.schema(schema)// 指定文件的路徑.csv("data/input/rating_100k.data")mlRatingsDF.printSchema()mlRatingsDF.show(10, truncate = false)println("=======================================================")/*** 將電影評分數(shù)據(jù)保存為CSV格式數(shù)據(jù)*/mlRatingsDF// 降低分區(qū)數(shù),此處設(shè)置為1,將所有數(shù)據(jù)保存到一個文件中.coalesce(1).write// 設(shè)置保存模式,依據(jù)實際業(yè)務(wù)場景選擇,此處為覆寫.mode(SaveMode.Overwrite).option("sep", ",")// TODO: 建議設(shè)置首行為列名.option("header", "true").csv("data/output/ml-csv-" + System.currentTimeMillis())// 關(guān)閉資源spark.stop()}}
???????parquet 數(shù)據(jù)
SparkSQL模塊中默認讀取數(shù)據(jù)文件格式就是parquet列式存儲數(shù)據(jù),通過參數(shù)【spark.sql.sources.default】設(shè)置,默認值為【parquet】。
示例代碼:
直接load加載parquet數(shù)據(jù)和指定parquet格式加載數(shù)據(jù)。
運行程序結(jié)果:
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}/*** SparkSQL讀取Parquet列式存儲數(shù)據(jù)*/
object SparkSQLParquet {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 從LocalFS上讀取parquet格式數(shù)據(jù)val usersDF: DataFrame = spark.read.parquet("data/input/users.parquet")usersDF.printSchema()usersDF.show(10, truncate = false)println("==================================================")// SparkSQL默認讀取文件格式為parquetval df = spark.read.load("data/input/users.parquet")df.printSchema()df.show(10, truncate = false)// 應(yīng)用結(jié)束,關(guān)閉資源spark.stop()}
}
???????jdbc 數(shù)據(jù)
回顧在SparkCore中讀取MySQL表的數(shù)據(jù)通過JdbcRDD來讀取的,在SparkSQL模塊中提供對應(yīng)接口,提供三種方式讀取數(shù)據(jù):
?方式一:單分區(qū)模式
?方式二:多分區(qū)模式,可以設(shè)置列的名稱,作為分區(qū)字段及列的值范圍和分區(qū)數(shù)目
?方式三:高度自由分區(qū)模式,通過設(shè)置條件語句設(shè)置分區(qū)數(shù)據(jù)及各個分區(qū)數(shù)據(jù)范圍
當加載讀取RDBMS表的數(shù)據(jù)量不大時,可以直接使用單分區(qū)模式加載;當數(shù)據(jù)量很多時,考慮使用多分區(qū)及自由分區(qū)方式加載。
從RDBMS表中讀取數(shù)據(jù),需要設(shè)置連接數(shù)據(jù)庫相關(guān)信息,基本屬性選項如下:
演示代碼如下:
//?連接數(shù)據(jù)庫三要素信息val?url:?String?=?"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"val?table:?String?=?"db_shop.so"//?存儲用戶和密碼等屬性val?props:?Properties?=?new?Properties()props.put("driver",?"com.mysql.cj.jdbc.Driver")props.put("user",?"root")props.put("password",?"123456")//?TODO:?從MySQL數(shù)據(jù)庫表:銷售訂單表?so//?def?jdbc(url:?String,?table:?String,?properties:?Properties):?DataFrameval?sosDF:?DataFrame?=?spark.read.jdbc(url,?table,?props)println(s"Count?=?${sosDF.count()}")sosDF.printSchema()sosDF.show(10,?truncate?=?false)
可以使用option方法設(shè)置連接數(shù)據(jù)庫信息,而不使用Properties傳遞,代碼如下:
//?TODO:?使用option設(shè)置參數(shù)val?dataframe:?DataFrame?=?spark.read.format("jdbc").option("driver",?"com.mysql.cj.jdbc.Driver").option("url",?"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true").option("user",?"root").option("password",?"123456").option("dbtable",?"db_shop.so").load()dataframe.show(5,?truncate?=?false)
???????加載/保存數(shù)據(jù)-API
????SparkSQL提供一套通用外部數(shù)據(jù)源接口,方便用戶從數(shù)據(jù)源加載和保存數(shù)據(jù),例如從MySQL表中既可以加載讀取數(shù)據(jù):load/read,又可以保存寫入數(shù)據(jù):save/write。
由于SparkSQL沒有內(nèi)置支持從HBase表中加載和保存數(shù)據(jù),但是只要實現(xiàn)外部數(shù)據(jù)源接口,也能像上面方式一樣讀取加載數(shù)據(jù)。
???????Load 加載數(shù)據(jù)
在SparkSQL中讀取數(shù)據(jù)使用SparkSession讀取,并且封裝到數(shù)據(jù)結(jié)構(gòu)Dataset/DataFrame中。
DataFrameReader專門用于加載load讀取外部數(shù)據(jù)源的數(shù)據(jù),基本格式如下:
SparkSQL模塊本身自帶支持讀取外部數(shù)據(jù)源的數(shù)據(jù):
總結(jié)起來三種類型數(shù)據(jù),也是實際開發(fā)中常用的:
?第一類:文件格式數(shù)據(jù)
文本文件text、csv文件和json文件
?第二類:列式存儲數(shù)據(jù)
Parquet格式、ORC格式
?第三類:數(shù)據(jù)庫表
關(guān)系型數(shù)據(jù)庫RDBMS:MySQL、DB2、Oracle和MSSQL
Hive倉庫表
官方文檔:http://spark.apache.org/docs/2.4.5/sql-data-sources-load-save-functions.html
此外加載文件數(shù)據(jù)時,可以直接使用SQL語句,指定文件存儲格式和路徑:
???????Save 保存數(shù)據(jù)
SparkSQL模塊中可以從某個外部數(shù)據(jù)源讀取數(shù)據(jù),就能向某個外部數(shù)據(jù)源保存數(shù)據(jù),提供相應(yīng)接口,通過DataFrameWrite類將數(shù)據(jù)進行保存。
與DataFrameReader類似,提供一套規(guī)則,將數(shù)據(jù)Dataset保存,基本格式如下:
SparkSQL模塊內(nèi)部支持保存數(shù)據(jù)源如下:
所以使用SpakrSQL分析數(shù)據(jù)時,從數(shù)據(jù)讀取,到數(shù)據(jù)分析及數(shù)據(jù)保存,鏈式操作,更多就是ETL操作。當將結(jié)果數(shù)據(jù)DataFrame/Dataset保存至Hive表中時,可以設(shè)置分區(qū)partition和分桶bucket,形式如下:
???????保存模式(SaveMode)
?????將Dataset/DataFrame數(shù)據(jù)保存到外部存儲系統(tǒng)中,考慮是否存在,存在的情況下的下如何進行保存,DataFrameWriter中有一個mode方法指定模式:
通過源碼發(fā)現(xiàn)SaveMode時枚舉類,使用Java語言編寫,如下四種保存模式:
?第一種:Append 追加模式,當數(shù)據(jù)存在時,繼續(xù)追加;
?第二種:Overwrite 覆寫模式,當數(shù)據(jù)存在時,覆寫以前數(shù)據(jù),存儲當前最新數(shù)據(jù);
?第三種:ErrorIfExists?存在及報錯;
?第四種:Ignore 忽略,數(shù)據(jù)存在時不做任何操作;
實際項目依據(jù)具體業(yè)務(wù)情況選擇保存模式,通常選擇Append和Overwrite模式。
???????案例演示
package cn.it.sqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** Author itcast* Desc 先準備一個df/ds,然后再將該df/ds的數(shù)據(jù)寫入到不同的數(shù)據(jù)源中,最后再從不同的數(shù)據(jù)源中讀取*/
object DataSourceDemo{case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準備環(huán)境-SparkSession和DFval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val lines: RDD[String] = sc.textFile("data/input/person.txt")val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))import spark.implicits._val personDF: DataFrame = personRDD.toDFpersonDF.show(6,false)/*+---+--------+---+|id |name ???|age|+---+--------+---+|1 ?|zhangsan|20 ||2 ?|lisi ???|29 ||3 ?|wangwu ?|25 ||4 ?|zhaoliu |30 ||5 ?|tianqi ?|35 ||6 ?|kobe ???|40 |+---+--------+---+*///2.將personDF寫入到不同的數(shù)據(jù)源personDF.write.mode(SaveMode.Overwrite).json("data/output/json")personDF.write.mode(SaveMode.Overwrite).csv("data/output/csv")personDF.write.mode(SaveMode.Overwrite).parquet("data/output/parquet")val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","root")personDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)println("寫入成功!")//personDF.write.text("data/output/text")//會報錯, Text data source supports only a single column, and you have 3 columns.personDF.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json1")//personDF.repartition(1)//3.從不同的數(shù)據(jù)源讀取數(shù)據(jù)val df1: DataFrame = spark.read.json("data/output/json")val df2: DataFrame = spark.read.csv("data/output/csv").toDF("id_my","name","age")val df3: DataFrame = spark.read.parquet("data/output/parquet")val df4: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)df1.show()df2.show()df3.show()df4.show()}
}
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(三十二):SparkSQL的External DataSource的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(三十一):S
- 下一篇: 2021年大数据Spark(三十三):S