日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(三十二):SparkSQL的External DataSource

發(fā)布時間:2023/11/28 生活经验 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(三十二):SparkSQL的External DataSource 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?

目錄

External DataSource

數(shù)據(jù)源與格式

text 數(shù)據(jù)

json 數(shù)據(jù)

csv 數(shù)據(jù)

parquet 數(shù)據(jù)

jdbc 數(shù)據(jù)

???????加載/保存數(shù)據(jù)-API

???????Load 加載數(shù)據(jù)

???????Save 保存數(shù)據(jù)

???????保存模式(SaveMode)

???????案例演示


External DataSource

在SparkSQL模塊,提供一套完成API接口,用于方便讀寫外部數(shù)據(jù)源的的數(shù)據(jù)(從Spark 1.4版本提供),框架本身內(nèi)置外部數(shù)據(jù)源:

在Spark 2.4版本中添加支持Image Source(圖像數(shù)據(jù)源)和Avro Source。

數(shù)據(jù)源與格式

?????數(shù)據(jù)分析處理中,數(shù)據(jù)可以分為結(jié)構(gòu)化數(shù)據(jù)、非結(jié)構(gòu)化數(shù)據(jù)及半結(jié)構(gòu)化數(shù)據(jù)。

??1)、結(jié)構(gòu)化數(shù)據(jù)(Structured)

結(jié)構(gòu)化數(shù)據(jù)源可提供有效的存儲和性能。例如,Parquet和ORC等柱狀格式使從列的子集中提取值變得更加容易。

基于行的存儲格式(如Avro)可有效地序列化和存儲提供存儲優(yōu)勢的數(shù)據(jù)。然而,這些優(yōu)點通常以靈活性為代價。如因結(jié)構(gòu)的固定性,格式轉(zhuǎn)變可能相對困難。

?2)、非結(jié)構(gòu)化數(shù)據(jù)(UnStructured)

相比之下,非結(jié)構(gòu)化數(shù)據(jù)源通常是自由格式文本或二進制對象,其不包含標記或元數(shù)據(jù)以定義數(shù)據(jù)的結(jié)構(gòu)。

報紙文章,醫(yī)療記錄,圖像,應(yīng)用程序日志通常被視為非結(jié)構(gòu)化數(shù)據(jù)。這些類型的源通常要求數(shù)據(jù)周圍的上下文是可解析的。

?3)、半結(jié)構(gòu)化數(shù)據(jù)(Semi-Structured)

半結(jié)構(gòu)化數(shù)據(jù)源是按記錄構(gòu)建的,但不一定具有跨越所有記錄的明確定義的全局模式。每個數(shù)據(jù)記錄都使用其結(jié)構(gòu)信息進行擴充。

半結(jié)構(gòu)化數(shù)據(jù)格式的好處是,它們在表達數(shù)據(jù)時提供了最大的靈活性,因為每條記錄都是自我描述的。但這些格式的主要缺點是它們會產(chǎn)生額外的解析開銷,并且不是特別為ad-hoc(特定)查詢而構(gòu)建的。

text 數(shù)據(jù)

SparkSession加載文本文件數(shù)據(jù),提供兩種方法,返回值分別為DataFrame和Dataset,前面【W(wǎng)ordCount】中已經(jīng)使用,下面看一下方法聲明:

可以看出textFile方法底層還是調(diào)用text方法,先加載數(shù)據(jù)封裝到DataFrame中,再使用as[String]方法將DataFrame轉(zhuǎn)換為Dataset,實際中推薦使用textFile方法,從Spark 2.0開始提供。

無論是text方法還是textFile方法讀取文本數(shù)據(jù)時,一行一行的加載數(shù)據(jù),每行數(shù)據(jù)使用UTF-8編碼的字符串,列名稱為【value】。?

json 數(shù)據(jù)

實際項目中,有時處理數(shù)據(jù)以JSON格式存儲的,尤其后續(xù)結(jié)構(gòu)化流式模塊:StructuredStreaming,從Kafka Topic消費數(shù)據(jù)很多時間是JSON個數(shù)據(jù),封裝到DataFrame中,需要解析提取字段的值。以讀取github操作日志JSON數(shù)據(jù)為例,數(shù)據(jù)結(jié)構(gòu)如下:

?1)、操作日志數(shù)據(jù)使用GZ壓縮:2015-03-01-11.json.gz,先使用json方法讀取。

?2)、使用textFile加載數(shù)據(jù),對每條JSON格式字符串數(shù)據(jù),使用SparkSQL函數(shù)庫functions中自帶get_json_obejct函數(shù)提取字段:id、type、public和created_at的值。

函數(shù):get_json_obejct使用說明

示例代碼:


package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** SparkSQL讀取JSON格式文本數(shù)據(jù)*/
object SparkSQLJson {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 從LocalFS上讀取json格式數(shù)據(jù)(壓縮)val jsonDF: DataFrame = spark.read.json("data/input/2015-03-01-11.json.gz")//jsonDF.printSchema()jsonDF.show(5, truncate = true)println("===================================================")val githubDS: Dataset[String] = spark.read.textFile("data/input/2015-03-01-11.json.gz")//githubDS.printSchema() // value 字段名稱,類型就是StringgithubDS.show(5,truncate = true)// TODO:使用SparkSQL自帶函數(shù),針對JSON格式數(shù)據(jù)解析的函數(shù)import org.apache.spark.sql.functions._// 獲取如下四個字段的值:id、type、public和created_atval gitDF: DataFrame = githubDS.select(get_json_object($"value", "$.id").as("id"),get_json_object($"value", "$.type").as("type"),get_json_object($"value", "$.public").as("public"),get_json_object($"value", "$.created_at").as("created_at"))gitDF.printSchema()gitDF.show(10, truncate = false)// 應(yīng)用結(jié)束,關(guān)閉資源spark.stop()}
}

運行結(jié)果:

???????csv 數(shù)據(jù)

在機器學習中,常常使用的數(shù)據(jù)存儲在csv/tsv文件格式中,所以SparkSQL中也支持直接讀取格式數(shù)據(jù),從2.0版本開始內(nèi)置數(shù)據(jù)源。關(guān)于CSV/TSV格式數(shù)據(jù)說明:

SparkSQL中讀取CSV格式數(shù)據(jù),可以設(shè)置一些選項,重點選項:

?1)、分隔符:sep

默認值為逗號,必須單個字符

?2)、數(shù)據(jù)文件首行是否是列名稱:header

默認值為false,如果數(shù)據(jù)文件首行是列名稱,設(shè)置為true

?3)、是否自動推斷每個列的數(shù)據(jù)類型:inferSchema

默認值為false,可以設(shè)置為true

官方提供案例:

當讀取CSV/TSV格式數(shù)據(jù)文件首行是否是列名稱,讀取數(shù)據(jù)方式(參數(shù)設(shè)置)不一樣的 。

?第一點:首行是列的名稱,如下方式讀取數(shù)據(jù)文件

???????//?TODO:?讀取TSV格式數(shù)據(jù)val?ratingsDF:?DataFrame?=?spark.read//?設(shè)置每行數(shù)據(jù)各個字段之間的分隔符,?默認值為?逗號.option("sep",?"\t")//?設(shè)置數(shù)據(jù)文件首行為列名稱,默認值為?false.option("header",?"true")//?自動推薦數(shù)據(jù)類型,默認值為false.option("inferSchema",?"true")//?指定文件的路徑.csv("datas/ml-100k/u.dat")ratingsDF.printSchema()ratingsDF.show(10,?truncate?=?false)

?第二點:首行不是列的名稱,如下方式讀取數(shù)據(jù)(設(shè)置Schema信息)

??????//?定義Schema信息val?schema?=?StructType(StructField("user_id",?IntegerType,?nullable?=?true)?::StructField("movie_id",?IntegerType,?nullable?=?true)?::StructField("rating",?DoubleType,?nullable?=?true)?::StructField("timestamp",?StringType,?nullable?=?true)?::?Nil)//?TODO:?讀取TSV格式數(shù)據(jù)val?mlRatingsDF:?DataFrame?=?spark.read//?設(shè)置每行數(shù)據(jù)各個字段之間的分隔符,?默認值為?逗號.option("sep",?"\t")//?指定Schema信息.schema(schema)//?指定文件的路徑.csv("datas/ml-100k/u.data")mlRatingsDF.printSchema()mlRatingsDF.show(5,?truncate?=?false)

?????將DataFrame數(shù)據(jù)保存至CSV格式文件,演示代碼如下:

示例代碼??

??????/***?將電影評分數(shù)據(jù)保存為CSV格式數(shù)據(jù)*/mlRatingsDF//?降低分區(qū)數(shù),此處設(shè)置為1,將所有數(shù)據(jù)保存到一個文件中.coalesce(1).write//?設(shè)置保存模式,依據(jù)實際業(yè)務(wù)場景選擇,此處為覆寫.mode(SaveMode.Overwrite).option("sep",?",")//?TODO:?建議設(shè)置首行為列名.option("header",?"true").csv("datas/ml-csv-"?+?System.nanoTime())

package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** SparkSQL 讀取CSV/TSV格式數(shù)據(jù):* i). 指定Schema信息* ii). 是否有header設(shè)置*/
object SparkSQLCsv {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._/*** 實際企業(yè)數(shù)據(jù)分析中* csv\tsv格式數(shù)據(jù),每個文件的第一行(head, 首行),字段的名稱(列名)*/// TODO: 讀取CSV格式數(shù)據(jù)val ratingsDF: DataFrame = spark.read// 設(shè)置每行數(shù)據(jù)各個字段之間的分隔符, 默認值為 逗號.option("sep", "\t")// 設(shè)置數(shù)據(jù)文件首行為列名稱,默認值為?false.option("header", "true")// 自動推薦數(shù)據(jù)類型,默認值為false.option("inferSchema", "true")// 指定文件的路徑.csv("data/input/rating_100k_with_head.data")ratingsDF.printSchema()ratingsDF.show(10, truncate = false)println("=======================================================")// 定義Schema信息val schema = StructType(StructField("user_id", IntegerType, nullable = true) ::StructField("movie_id", IntegerType, nullable = true) ::StructField("rating", DoubleType, nullable = true) ::StructField("timestamp", StringType, nullable = true) :: Nil)// TODO: 讀取CSV格式數(shù)據(jù)val mlRatingsDF: DataFrame = spark.read// 設(shè)置每行數(shù)據(jù)各個字段之間的分隔符, 默認值為 逗號.option("sep", "\t")// 指定Schema信息.schema(schema)// 指定文件的路徑.csv("data/input/rating_100k.data")mlRatingsDF.printSchema()mlRatingsDF.show(10, truncate = false)println("=======================================================")/*** 將電影評分數(shù)據(jù)保存為CSV格式數(shù)據(jù)*/mlRatingsDF// 降低分區(qū)數(shù),此處設(shè)置為1,將所有數(shù)據(jù)保存到一個文件中.coalesce(1).write// 設(shè)置保存模式,依據(jù)實際業(yè)務(wù)場景選擇,此處為覆寫.mode(SaveMode.Overwrite).option("sep", ",")// TODO: 建議設(shè)置首行為列名.option("header", "true").csv("data/output/ml-csv-" + System.currentTimeMillis())// 關(guān)閉資源spark.stop()}}

???????parquet 數(shù)據(jù)

SparkSQL模塊中默認讀取數(shù)據(jù)文件格式就是parquet列式存儲數(shù)據(jù)通過參數(shù)【spark.sql.sources.default】設(shè)置,默認值為【parquet】。

示例代碼:

直接load加載parquet數(shù)據(jù)和指定parquet格式加載數(shù)據(jù)。

運行程序結(jié)果:


package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}/*** SparkSQL讀取Parquet列式存儲數(shù)據(jù)*/
object SparkSQLParquet {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 從LocalFS上讀取parquet格式數(shù)據(jù)val usersDF: DataFrame = spark.read.parquet("data/input/users.parquet")usersDF.printSchema()usersDF.show(10, truncate = false)println("==================================================")// SparkSQL默認讀取文件格式為parquetval df = spark.read.load("data/input/users.parquet")df.printSchema()df.show(10, truncate = false)// 應(yīng)用結(jié)束,關(guān)閉資源spark.stop()}
}

???????jdbc 數(shù)據(jù)

回顧在SparkCore中讀取MySQL表的數(shù)據(jù)通過JdbcRDD來讀取的,在SparkSQL模塊中提供對應(yīng)接口,提供三種方式讀取數(shù)據(jù):

?方式一:單分區(qū)模式

?方式二:多分區(qū)模式,可以設(shè)置列的名稱,作為分區(qū)字段及列的值范圍和分區(qū)數(shù)目

?方式三:高度自由分區(qū)模式,通過設(shè)置條件語句設(shè)置分區(qū)數(shù)據(jù)及各個分區(qū)數(shù)據(jù)范圍

當加載讀取RDBMS表的數(shù)據(jù)量不大時,可以直接使用單分區(qū)模式加載;當數(shù)據(jù)量很多時,考慮使用多分區(qū)及自由分區(qū)方式加載。

從RDBMS表中讀取數(shù)據(jù),需要設(shè)置連接數(shù)據(jù)庫相關(guān)信息,基本屬性選項如下:

演示代碼如下:

//?連接數(shù)據(jù)庫三要素信息val?url:?String?=?"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"val?table:?String?=?"db_shop.so"//?存儲用戶和密碼等屬性val?props:?Properties?=?new?Properties()props.put("driver",?"com.mysql.cj.jdbc.Driver")props.put("user",?"root")props.put("password",?"123456")//?TODO:?從MySQL數(shù)據(jù)庫表:銷售訂單表?so//?def?jdbc(url:?String,?table:?String,?properties:?Properties):?DataFrameval?sosDF:?DataFrame?=?spark.read.jdbc(url,?table,?props)println(s"Count?=?${sosDF.count()}")sosDF.printSchema()sosDF.show(10,?truncate?=?false)

可以使用option方法設(shè)置連接數(shù)據(jù)庫信息,而不使用Properties傳遞,代碼如下:

//?TODO:?使用option設(shè)置參數(shù)val?dataframe:?DataFrame?=?spark.read.format("jdbc").option("driver",?"com.mysql.cj.jdbc.Driver").option("url",?"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true").option("user",?"root").option("password",?"123456").option("dbtable",?"db_shop.so").load()dataframe.show(5,?truncate?=?false)

???????加載/保存數(shù)據(jù)-API

????SparkSQL提供一套通用外部數(shù)據(jù)源接口,方便用戶從數(shù)據(jù)源加載和保存數(shù)據(jù),例如從MySQL表中既可以加載讀取數(shù)據(jù):load/read,又可以保存寫入數(shù)據(jù):save/write。

由于SparkSQL沒有內(nèi)置支持從HBase表中加載和保存數(shù)據(jù),但是只要實現(xiàn)外部數(shù)據(jù)源接口,也能像上面方式一樣讀取加載數(shù)據(jù)。

???????Load 加載數(shù)據(jù)

在SparkSQL中讀取數(shù)據(jù)使用SparkSession讀取,并且封裝到數(shù)據(jù)結(jié)構(gòu)Dataset/DataFrame中。

DataFrameReader專門用于加載load讀取外部數(shù)據(jù)源的數(shù)據(jù),基本格式如下:

SparkSQL模塊本身自帶支持讀取外部數(shù)據(jù)源的數(shù)據(jù):

總結(jié)起來三種類型數(shù)據(jù),也是實際開發(fā)中常用的:

?第一類:文件格式數(shù)據(jù)

文本文件text、csv文件和json文件

?第二類:列式存儲數(shù)據(jù)

Parquet格式、ORC格式

?第三類:數(shù)據(jù)庫表

關(guān)系型數(shù)據(jù)庫RDBMS:MySQL、DB2、Oracle和MSSQL

Hive倉庫表

官方文檔:http://spark.apache.org/docs/2.4.5/sql-data-sources-load-save-functions.html

此外加載文件數(shù)據(jù)時,可以直接使用SQL語句,指定文件存儲格式和路徑:

???????Save 保存數(shù)據(jù)

SparkSQL模塊中可以從某個外部數(shù)據(jù)源讀取數(shù)據(jù),就能向某個外部數(shù)據(jù)源保存數(shù)據(jù),提供相應(yīng)接口,通過DataFrameWrite類將數(shù)據(jù)進行保存。

與DataFrameReader類似,提供一套規(guī)則,將數(shù)據(jù)Dataset保存,基本格式如下:

SparkSQL模塊內(nèi)部支持保存數(shù)據(jù)源如下:

所以使用SpakrSQL分析數(shù)據(jù)時,從數(shù)據(jù)讀取,到數(shù)據(jù)分析及數(shù)據(jù)保存,鏈式操作,更多就是ETL操作。當將結(jié)果數(shù)據(jù)DataFrame/Dataset保存至Hive表中時,可以設(shè)置分區(qū)partition和分桶bucket,形式如下:

???????保存模式(SaveMode

?????將Dataset/DataFrame數(shù)據(jù)保存到外部存儲系統(tǒng)中,考慮是否存在,存在的情況下的下如何進行保存,DataFrameWriter中有一個mode方法指定模式:

通過源碼發(fā)現(xiàn)SaveMode時枚舉類,使用Java語言編寫,如下四種保存模式:

?第一種:Append 追加模式,當數(shù)據(jù)存在時,繼續(xù)追加;

?第二種:Overwrite 覆寫模式,當數(shù)據(jù)存在時,覆寫以前數(shù)據(jù),存儲當前最新數(shù)據(jù);

?第三種:ErrorIfExists?存在及報錯;

?第四種:Ignore 忽略,數(shù)據(jù)存在時不做任何操作;

實際項目依據(jù)具體業(yè)務(wù)情況選擇保存模式,通常選擇Append和Overwrite模式。

???????案例演示

package cn.it.sqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** Author itcast* Desc 先準備一個df/ds,然后再將該df/ds的數(shù)據(jù)寫入到不同的數(shù)據(jù)源中,最后再從不同的數(shù)據(jù)源中讀取*/
object DataSourceDemo{case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準備環(huán)境-SparkSession和DFval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val lines: RDD[String] = sc.textFile("data/input/person.txt")val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))import spark.implicits._val personDF: DataFrame = personRDD.toDFpersonDF.show(6,false)/*+---+--------+---+|id |name ???|age|+---+--------+---+|1 ?|zhangsan|20 ||2 ?|lisi ???|29 ||3 ?|wangwu ?|25 ||4 ?|zhaoliu |30 ||5 ?|tianqi ?|35 ||6 ?|kobe ???|40 |+---+--------+---+*///2.將personDF寫入到不同的數(shù)據(jù)源personDF.write.mode(SaveMode.Overwrite).json("data/output/json")personDF.write.mode(SaveMode.Overwrite).csv("data/output/csv")personDF.write.mode(SaveMode.Overwrite).parquet("data/output/parquet")val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","root")personDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)println("寫入成功!")//personDF.write.text("data/output/text")//會報錯, Text data source supports only a single column, and you have 3 columns.personDF.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json1")//personDF.repartition(1)//3.從不同的數(shù)據(jù)源讀取數(shù)據(jù)val df1: DataFrame = spark.read.json("data/output/json")val df2: DataFrame = spark.read.csv("data/output/csv").toDF("id_my","name","age")val df3: DataFrame = spark.read.parquet("data/output/parquet")val df4: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)df1.show()df2.show()df3.show()df4.show()}
}

總結(jié)

以上是生活随笔為你收集整理的2021年大数据Spark(三十二):SparkSQL的External DataSource的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。