日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

【Apache Spark 】第 6 章Spark SQL 和数据集

發(fā)布時(shí)間:2023/12/20 数据库 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Apache Spark 】第 6 章Spark SQL 和数据集 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

?🔎大家好,我是Sonhhxg_柒,希望你看完之后,能對(duì)你有所幫助,不足請(qǐng)指正!共同學(xué)習(xí)交流🔎

📝個(gè)人主頁(yè)-Sonhhxg_柒的博客_CSDN博客?📃

🎁歡迎各位→點(diǎn)贊👍 + 收藏?? + 留言📝?

📣系列專(zhuān)欄 - 機(jī)器學(xué)習(xí)【ML】?自然語(yǔ)言處理【NLP】? 深度學(xué)習(xí)【DL】

?

?🖍foreword

?說(shuō)明?本人講解主要包括Python、機(jī)器學(xué)習(xí)(ML)、深度學(xué)習(xí)(DL)、自然語(yǔ)言處理(NLP)等內(nèi)容。

如果你對(duì)這個(gè)系列感興趣的話(huà),可以關(guān)注訂閱喲👋

文章目錄

Java 和 Scala 的單一 API

用于數(shù)據(jù)集的 Scala 案例類(lèi)和 JavaBean

使用數(shù)據(jù)集

創(chuàng)建示例數(shù)據(jù)

轉(zhuǎn)換樣本數(shù)據(jù)

高階函數(shù)和函數(shù)式編程

將 DataFrame 轉(zhuǎn)換為數(shù)據(jù)集

數(shù)據(jù)集和數(shù)據(jù)幀的內(nèi)存管理

數(shù)據(jù)集編碼器

Spark 的內(nèi)部格式與 Java 對(duì)象格式

序列化和反序列化 (SerDe)

使用數(shù)據(jù)集的成本

降低成本的策略

概括


在第4章和第5章中,我們介紹了 Spark SQL 和 DataFrame API。我們研究了如何連接到內(nèi)置和外部數(shù)據(jù)源,了解了 Spark SQL 引擎,并探討了諸如 SQL 和 DataFrame 之間的互操作性、創(chuàng)建和管理視圖和表以及高級(jí) DataFrame 和 SQL 轉(zhuǎn)換等主題。

盡管我們?cè)诘?3 章中簡(jiǎn)要介紹了 Dataset API?,但我們略讀了 Datasets(強(qiáng)類(lèi)型分布式集合)在 Spark 中的創(chuàng)建、存儲(chǔ)、序列化和反序列化的主要方面。

在本章中,我們將深入了解數(shù)據(jù)集:我們將探索在 Java 和 Scala 中使用數(shù)據(jù)集,Spark 如何管理內(nèi)存以適應(yīng)作為高級(jí) API 的一部分的數(shù)據(jù)集結(jié)構(gòu),以及與使用數(shù)據(jù)集相關(guān)的成本。

Java 和 Scala 的單一 API

您可能還記得第 3 章(圖 3-1和表 3-6),數(shù)據(jù)集為強(qiáng)類(lèi)型對(duì)象提供了統(tǒng)一且單一的 API。在 Spark 支持的語(yǔ)言中,只有 Scala 和 Java 是強(qiáng)類(lèi)型的;因此,Python 和 R 僅支持無(wú)類(lèi)型的 DataFrame API。

數(shù)據(jù)集是特定領(lǐng)域的類(lèi)型化對(duì)象,可以使用函數(shù)式編程或您熟悉的 DataFrame API 中的 DSL 運(yùn)算符并行操作。

由于這個(gè)單一的 API,Java 開(kāi)發(fā)人員不再冒險(xiǎn)落后。例如,將來(lái)對(duì) Scala 的groupBy()、flatMap()、map()或filter()API 的任何接口或行為更改對(duì)于 Java 也將是相同的,因?yàn)樗莾蓚€(gè)實(shí)現(xiàn)通用的單一接口。

用于數(shù)據(jù)集的 Scala 案例類(lèi)和 JavaBean

如果您還記得第 3 章(表 3-2),Spark 具有內(nèi)部數(shù)據(jù)類(lèi)型,例如StringType、BinaryType、IntegerType、BooleanType和MapType,它用于在 Spark 操作期間無(wú)縫映射到 Scala 和 Java 中的語(yǔ)言特定數(shù)據(jù)類(lèi)型。這種映射是通過(guò)編碼器完成的,我們將在本章后面討論。

為了在 Scala 中創(chuàng)建類(lèi)型化對(duì)象Dataset[T],T您需要一個(gè)定義該對(duì)象的案例類(lèi)。使用第 3 章(表 3-1)中的示例數(shù)據(jù),假設(shè)我們有一個(gè) JSON 文件,其中包含數(shù)百萬(wàn)關(guān)于博客作者以以下格式撰寫(xiě)的關(guān)于 Apache Spark 的條目:

{id: 1, first: "Jules", last: "Damji", url: "https://tinyurl.1", date: "1/4/2016", hits: 4535, campaigns: {"twitter", "LinkedIn"}}, ... {id: 87, first: "Brooke", last: "Wenig", url: "https://tinyurl.2", date: "5/5/2018", hits: 8908, campaigns: {"twitter", "LinkedIn"}}

要?jiǎng)?chuàng)建分布式Dataset[Bloggers],我們必須首先定義一個(gè) Scala 案例類(lèi),該類(lèi)定義包含 Scala 對(duì)象的每個(gè)單獨(dú)字段。此案例類(lèi)用作類(lèi)型化對(duì)象的藍(lán)圖或模式Bloggers:

// In Scala case class Bloggers(id:Int, first:String, last:String, url:String, date:String, hits: Int, campaigns:Array[String])

我們現(xiàn)在可以從數(shù)據(jù)源中讀取文件:

val bloggers = "../data/bloggers.json" val bloggersDS = spark.read.format("json").option("path", bloggers).load().as[Bloggers]

生成的分布式數(shù)據(jù)集合中的每一行都是類(lèi)型Bloggers。

同樣,您可以在 Java 中創(chuàng)建一個(gè) JavaBean 類(lèi)型的類(lèi)Bloggers,然后使用編碼器創(chuàng)建一個(gè)Dataset<Bloggers>:

// In Java import org.apache.spark.sql.Encoders; import java.io.Serializable;public class Bloggers implements Serializable {private int id;private String first;private String last;private String url;private String date;private int hits;private Array[String] campaigns;// JavaBean getters and setters int getID() { return id; } void setID(int i) { id = i; } String getFirst() { return first; } void setFirst(String f) { first = f; } String getLast() { return last; } void setLast(String l) { last = l; } String getURL() { return url; } void setURL (String u) { url = u; } String getDate() { return date; } Void setDate(String d) { date = d; } int getHits() { return hits; } void setHits(int h) { hits = h; }Array[String] getCampaigns() { return campaigns; } void setCampaigns(Array[String] c) { campaigns = c; } }// Create Encoder Encoder<Bloggers> BloggerEncoder = Encoders.bean(Bloggers.class); String bloggers = "../bloggers.json" Dataset<Bloggers>bloggersDS = spark.read.format("json").option("path", bloggers).load().as(BloggerEncoder);

如您所見(jiàn),在 Scala 和 Java 中創(chuàng)建數(shù)據(jù)集需要一些先見(jiàn)之明,因?yàn)槟仨氈勒谧x取的行的所有單個(gè)列名稱(chēng)和類(lèi)型。與 DataFrame 不同,您可以選擇讓 Spark 推斷架構(gòu),Dataset API 要求您提前定義數(shù)據(jù)類(lèi)型,并且您的案例類(lèi)或 JavaBean 類(lèi)與您的架構(gòu)匹配。

筆記

Scala 案例類(lèi)或 Java 類(lèi)定義中的字段名稱(chēng)必須與數(shù)據(jù)源中的順序匹配。數(shù)據(jù)中每一行的列名會(huì)自動(dòng)映射到類(lèi)中對(duì)應(yīng)的名稱(chēng),并自動(dòng)保留類(lèi)型。

如果字段名稱(chēng)與您的輸入數(shù)據(jù)匹配,您可以使用現(xiàn)有的 Scala 案例類(lèi)或 JavaBean 類(lèi)。使用 Dataset API與使用 DataFrame一樣簡(jiǎn)單、簡(jiǎn)潔和聲明性。對(duì)于大多數(shù)數(shù)據(jù)集的轉(zhuǎn)換,您可以使用在前幾章中學(xué)習(xí)過(guò)的相同的關(guān)系運(yùn)算符。

讓我們檢查使用示例數(shù)據(jù)集的一些方面.

使用數(shù)據(jù)集

創(chuàng)建示例數(shù)據(jù)集的一種簡(jiǎn)單而動(dòng)態(tài)的方法是使用實(shí)SparkSession??例。在這個(gè)場(chǎng)景中,為了說(shuō)明的目的,我們動(dòng)態(tài)地創(chuàng)建一個(gè)包含三個(gè)字段的 Scala 對(duì)象:(uid用戶(hù)的唯一 ID)、uname(隨機(jī)生成的用戶(hù)名字符串)和usage(服務(wù)器或服務(wù)使用的分鐘數(shù))。

創(chuàng)建示例數(shù)據(jù)

首先,讓我們生成一些示例數(shù)據(jù):

// In Scala import scala.util.Random._ // Our case class for the Dataset case class Usage(uid:Int, uname:String, usage: Int) val r = new scala.util.Random(42) // Create 1000 instances of scala Usage class // This generates data on the fly val data = for (i <- 0 to 1000) yield (Usage(i, "user-" + r.alphanumeric.take(5).mkString(""),r.nextInt(1000))) // Create a Dataset of Usage typed data val dsUsage = spark.createDataset(data) dsUsage.show(10)+---+----------+-----+ |uid| uname|usage| +---+----------+-----+ | 0|user-Gpi2C| 525| | 1|user-DgXDi| 502| | 2|user-M66yO| 170| | 3|user-xTOn6| 913| | 4|user-3xGSz| 246| | 5|user-2aWRN| 727| | 6|user-EzZY1| 65| | 7|user-ZlZMZ| 935| | 8|user-VjxeG| 756| | 9|user-iqf1P| 3| +---+----------+-----+ only showing top 10 rows

在 Java 中這個(gè)想法是相似的,但我們必須使用顯式Encoders(在 Scala 中,Spark 隱式處理):

// In Java import org.apache.spark.sql.Encoders; import org.apache.commons.lang3.RandomStringUtils; import java.io.Serializable; import java.util.Random; import java.util.ArrayList; import java.util.List;// Create a Java class as a Bean public class Usage implements Serializable {int uid; // user idString uname; // usernameint usage; // usagepublic Usage(int uid, String uname, int usage) {this.uid = uid;this.uname = uname;this.usage = usage;}// JavaBean getters and setters public int getUid() { return this.uid; }public void setUid(int uid) { this.uid = uid; }public String getUname() { return this.uname; }public void setUname(String uname) { this.uname = uname; }public int getUsage() { return this.usage; }public void setUsage(int usage) { this.usage = usage; }public Usage() {}public String toString() {return "uid: '" + this.uid + "', uame: '" + this.uname + "', usage: '" + this.usage + "'";} }// Create an explicit Encoder Encoder<Usage> usageEncoder = Encoders.bean(Usage.class); Random rand = new Random(); rand.setSeed(42); List<Usage> data = new ArrayList<Usage>()// Create 1000 instances of Java Usage class for (int i = 0; i < 1000; i++) {data.add(new Usage(i, "user" + RandomStringUtils.randomAlphanumeric(5),rand.nextInt(1000));// Create a Dataset of Usage typed data Dataset<Usage> dsUsage = spark.createDataset(data, usageEncoder);

筆記

Scala 和 Java 生成的 Dataset 會(huì)有所不同,因?yàn)殡S機(jī)種子算法可能不同。因此,您的 Scala 和 Java 的查詢(xún)結(jié)果會(huì)有所不同。

現(xiàn)在我們已經(jīng)生成了數(shù)據(jù)集,dsUsage讓我們執(zhí)行一些我們?cè)谇皫渍轮型瓿傻某R?jiàn)轉(zhuǎn)換。

轉(zhuǎn)換樣本數(shù)據(jù)

回想一下,數(shù)據(jù)集是特定領(lǐng)域?qū)ο蟮膹?qiáng)類(lèi)型集合。這些對(duì)象可以使用函數(shù)或關(guān)系操作并行轉(zhuǎn)換。這些轉(zhuǎn)換的示例包括map()、reduce()、filter()、select()和aggregate()。作為高階函數(shù)的示例,這些方法可以將 lambda、閉包或函數(shù)作為參數(shù)并返回結(jié)果。因此,它們非常適合函數(shù)式編程。

Scala 是一種函數(shù)式編程語(yǔ)言,最近 lambda、函數(shù)式參數(shù)和閉包也被添加到 Java 中。讓我們?cè)?Spark 中嘗試幾個(gè)高階函數(shù),并將函數(shù)式編程結(jié)構(gòu)與我們之前創(chuàng)建的示例數(shù)據(jù)一起使用。

高階函數(shù)和函數(shù)式編程

舉個(gè)簡(jiǎn)單的例子,讓我們使用filter()返回我們dsUsage數(shù)據(jù)集中所有使用時(shí)間超過(guò) 900 分鐘的用戶(hù)。一種方法是使用函數(shù)表達(dá)式作為filter()方法的參數(shù):

// In Scala import org.apache.spark.sql.functions._ dsUsage.filter(d => d.usage > 900).orderBy(desc("usage")).show(5, false)

另一種方法是定義一個(gè)函數(shù)并將該函數(shù)作為參數(shù)提供給filter():

def filterWithUsage(u: Usage) = u.usage > 900 dsUsage.filter(filterWithUsage(_)).orderBy(desc("usage")).show(5)+---+----------+-----+ |uid| uname|usage| +---+----------+-----+ |561|user-5n2xY| 999| |113|user-nnAXr| 999| |605|user-NL6c4| 999| |634|user-L0wci| 999| |805|user-LX27o| 996| +---+----------+-----+ only showing top 5 rows

在第一種情況下,我們使用 lambda 表達(dá)式,{d.usage > 900}作為filter()方法的參數(shù),而在第二種情況下,我們定義了一個(gè) Scala 函數(shù),def filterWithUsage(u: Usage) = u.usage > 900。在這兩種情況下,該filter()方法都會(huì)遍歷Usage分布式數(shù)據(jù)集中對(duì)象的每一行,并應(yīng)用表達(dá)式或執(zhí)行函數(shù),Usage為表達(dá)式或函數(shù)的值為 的行返回類(lèi)型為的新數(shù)據(jù)集true。(有關(guān)方法簽名的詳細(xì)信息,請(qǐng)參閱Scala 文檔。)

在 Java 中, to 的參數(shù)filter()類(lèi)型為FilterFunction<T>。這可以匿名內(nèi)聯(lián)或使用命名函數(shù)定義。在本例中,我們將按名稱(chēng)定義函數(shù)并將其分配給變量f。應(yīng)用此函數(shù)filter()將返回一個(gè)新數(shù)據(jù)集,其中包含我們過(guò)濾條件為的所有行true:

// In Java // Define a Java filter function FilterFunction<Usage> f = new FilterFunction<Usage>() {public boolean call(Usage u) {return (u.usage > 900);} };// Use filter with our function and order the results in descending order dsUsage.filter(f).orderBy(col("usage").desc()).show(5);+---+----------+-----+ |uid|uname |usage| +---+----------+-----+ |67 |user-qCGvZ|997 | |878|user-J2HUU|994 | |668|user-pz2Lk|992 | |750|user-0zWqR|991 | |242|user-g0kF6|989 | +---+----------+-----+ only showing top 5 rows

并非所有 lambda 或函數(shù)參數(shù)都必須計(jì)算為Boolean值;他們也可以返回計(jì)算值。考慮這個(gè)使用高階函數(shù)的例子map(),我們的目標(biāo)是找出每個(gè)用戶(hù)的使用成本,其usage價(jià)值超過(guò)某個(gè)閾值,這樣我們就可以為這些用戶(hù)提供每分鐘的特價(jià)。

// In Scala // Use an if-then-else lambda expression and compute a value dsUsage.map(u => {if (u.usage > 750) u.usage * .15 else u.usage * .50 }).show(5, false) // Define a function to compute the usage def computeCostUsage(usage: Int): Double = {if (usage > 750) usage * 0.15 else usage * 0.50 } // Use the function as an argument to map() dsUsage.map(u => {computeCostUsage(u.usage)}).show(5, false) +------+ |value | +------+ |262.5 | |251.0 | |85.0 | |136.95| |123.0 | +------+ only showing top 5 rows

要map()在 Java 中使用,您必須定義一個(gè)MapFunction<T>.?這可以是匿名類(lèi)或擴(kuò)展的已定義類(lèi)MapFunction<T>。對(duì)于這個(gè)例子,我們內(nèi)聯(lián)使用它——也就是說(shuō),在方法調(diào)用本身中:

// In Java // Define an inline MapFunction dsUsage.map((MapFunction<Usage, Double>) u -> {if (u.usage > 750)return u.usage * 0.15;elsereturn u.usage * 0.50; }, Encoders.DOUBLE()).show(5); // We need to explicitly specify the Encoder +------+ |value | +------+ |65.0 | |114.45| |124.0 | |132.6 | |145.5 | +------+ only showing top 5 rows

盡管我們已經(jīng)計(jì)算了使用成本的值,但我們不知道計(jì)算值與哪些用戶(hù)相關(guān)聯(lián)。我們?nèi)绾潍@得這些信息?

步驟很簡(jiǎn)單:

  • 創(chuàng)建一個(gè) Scala 案例類(lèi)或 JavaBean 類(lèi)UsageCost,帶有一個(gè)名為 的附加字段或列cost。

  • 定義一個(gè)函數(shù)來(lái)計(jì)算并在方法cost中使用它。map()

  • 這是 Scala 中的樣子:

    // In Scala // Create a new case class with an additional field, cost case class UsageCost(uid: Int, uname:String, usage: Int, cost: Double)// Compute the usage cost with Usage as a parameter // Return a new object, UsageCost def computeUserCostUsage(u: Usage): UsageCost = {val v = if (u.usage > 750) u.usage * 0.15 else u.usage * 0.50UsageCost(u.uid, u.uname, u.usage, v) }// Use map() on our original Dataset dsUsage.map(u => {computeUserCostUsage(u)}).show(5)+---+----------+-----+------+ |uid| uname|usage| cost| +---+----------+-----+------+ | 0|user-Gpi2C| 525| 262.5| | 1|user-DgXDi| 502| 251.0| | 2|user-M66yO| 170| 85.0| | 3|user-xTOn6| 913|136.95| | 4|user-3xGSz| 246| 123.0| +---+----------+-----+------+ only showing top 5 rows

    現(xiàn)在我們有了一個(gè)轉(zhuǎn)換后的數(shù)據(jù)集,其中包含一個(gè)由轉(zhuǎn)換cost中的函數(shù)計(jì)算的新列,map()以及所有其他列。

    同樣,在 Java 中,如果我們想要與每個(gè)用戶(hù)關(guān)聯(lián)的成本,我們需要定義一個(gè) JavaBean 類(lèi)UsageCost和MapFunction<T>.?有關(guān)完整的 JavaBean 示例,請(qǐng)參閱本書(shū)的GitHub 存儲(chǔ)庫(kù);為簡(jiǎn)潔起見(jiàn),我們將僅在MapFunction<T>此處顯示內(nèi)聯(lián):

    // In Java // Get the Encoder for the JavaBean class Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);// Apply map() function to our data dsUsage.map( (MapFunction<Usage, UsageCost>) u -> {double v = 0.0;if (u.usage > 750) v = u.usage * 0.15; else v = u.usage * 0.50;return new UsageCost(u.uid, u.uname,u.usage, v); },usageCostEncoder).show(5);+------+---+----------+-----+ | cost|uid| uname|usage| +------+---+----------+-----+ | 65.0| 0|user-xSyzf| 130| |114.45| 1|user-iOI72| 763| | 124.0| 2|user-QHRUk| 248| | 132.6| 3|user-8GTjo| 884| | 145.5| 4|user-U4cU1| 970| +------+---+----------+-----+ only showing top 5 rows

    關(guān)于使用高階函數(shù)和數(shù)據(jù)集,有幾點(diǎn)需要注意:

    • 我們使用類(lèi)型化的 JVM 對(duì)象作為函數(shù)的參數(shù)。

    • 我們使用點(diǎn)表示法(來(lái)自面向?qū)ο蟮木幊?#xff09;來(lái)訪(fǎng)問(wèn)類(lèi)型化 JVM 對(duì)象中的各個(gè)字段,使其更易于閱讀。

    • 我們的一些函數(shù)和 lambda 簽名可以是類(lèi)型安全的,確保編譯時(shí)錯(cuò)誤檢測(cè)并指示 Spark 處理哪些數(shù)據(jù)類(lèi)型、執(zhí)行哪些操作等。

    • 我們的代碼具有可讀性、表達(dá)性和簡(jiǎn)潔性,在 lambda 表達(dá)式中使用 Java 或 Scala 語(yǔ)言特性。

    • Spark 在 Java 和 Scala 中都提供了與高階函數(shù)構(gòu)造等效的map()和filter()沒(méi)有的高階函數(shù)構(gòu)造,因此您不必將函數(shù)式編程與 Datasets 或 DataFrames 一起使用。相反,您可以簡(jiǎn)單地使用條件 DSL 運(yùn)算符或 SQL 表達(dá)式:例如,dsUsage.filter("usage > 900")或dsUsage($"usage" > 900).?(有關(guān)這方面的更多信息,請(qǐng)參閱“使用數(shù)據(jù)集的成本”。)

    • 對(duì)于數(shù)據(jù)集,我們使用編碼器,這是一種在 JVM 和 Spark 的數(shù)據(jù)類(lèi)型內(nèi)部二進(jìn)制格式之間有效轉(zhuǎn)換數(shù)據(jù)的機(jī)制(更多信息請(qǐng)參見(jiàn)“數(shù)據(jù)集編碼器”)。

    筆記

    高階函數(shù)和函數(shù)式編程并不是 Spark 數(shù)據(jù)集獨(dú)有的;您也可以將它們與 DataFrame 一起使用。回想一下,DataFrame 是一個(gè)Dataset[Row],其中Row是一個(gè)通用的無(wú)類(lèi)型 JVM 對(duì)象,可以保存不同類(lèi)型的字段。方法簽名采用對(duì) 進(jìn)行操作的表達(dá)式或函數(shù)Row,這意味著每個(gè)Row的數(shù)據(jù)類(lèi)型都可以作為表達(dá)式或函數(shù)的輸入值。

    將 DataFrame 轉(zhuǎn)換為數(shù)據(jù)集

    對(duì)于查詢(xún)和構(gòu)造的強(qiáng)類(lèi)型檢查,您可以將 DataFrames 轉(zhuǎn)換為 Datasets。要將現(xiàn)有 DataFrame 轉(zhuǎn)換df為 Dataset 類(lèi)型SomeCaseClass,只需使用df.as[SomeCaseClass]符號(hào)。我們之前看到了一個(gè)這樣的例子:

    // In Scala val bloggersDS = spark.read.format("json").option("path", "/data/bloggers/bloggers.json").load().as[Bloggers]

    spark.read.format("json")返回 a?DataFrame<Row>,它在 Scala 中是Dataset[Row].?Using.as[Bloggers]指示 Spark 使用本章后面討論的編碼器,將對(duì)象從 Spark 的內(nèi)部?jī)?nèi)存表示序列化/反序列化為 JVMBloggers對(duì)象.

    數(shù)據(jù)集和數(shù)據(jù)幀的內(nèi)存管理

    Spark 是一種密集型內(nèi)存分布式大數(shù)據(jù)引擎,因此其內(nèi)存的有效利用對(duì)其執(zhí)行速度至關(guān)重要。1縱觀(guān)其發(fā)布?xì)v史,Spark 對(duì)內(nèi)存的使用發(fā)生了顯著變化:

    • Spark 1.0 使用基于 RDD 的 Java 對(duì)象進(jìn)行內(nèi)存存儲(chǔ)、序列化和反序列化,這在資源方面很昂貴且速度很慢。此外,存儲(chǔ)是在 Java 堆上分配的,因此對(duì)于大型數(shù)據(jù)集,您只能受 JVM 垃圾收集 (GC) 的支配。

    • Spark 1.x 引入了Project Tungsten。它的一個(gè)突出特點(diǎn)是一種新的內(nèi)部基于行的格式,使用偏移量和指針在堆外內(nèi)存中布局?jǐn)?shù)據(jù)集和數(shù)據(jù)幀。Spark 使用一種稱(chēng)為編碼器的高效機(jī)制在 JVM 與其內(nèi)部 Tungsten 格式之間進(jìn)行序列化和反序列化。在堆外分配內(nèi)存意味著 Spark 較少受到 GC 的阻礙。

    • Spark 2.x 引入了第二代 Tungsten 引擎,具有全階段代碼生成和向量化的基于列的內(nèi)存布局。基于現(xiàn)代編譯器的思想和技術(shù),這個(gè)新版本還利用現(xiàn)代 CPU 和緩存架構(gòu),通過(guò)“單指令多數(shù)據(jù)”(SIMD) 方法實(shí)現(xiàn)快速并行數(shù)據(jù)訪(fǎng)問(wèn)。

    數(shù)據(jù)集編碼器

    編碼器將堆外內(nèi)存中的數(shù)據(jù)從 Spark 的內(nèi)部 Tungsten 格式轉(zhuǎn)換為 JVM Java 對(duì)象。換句話(huà)說(shuō),它們將數(shù)據(jù)集對(duì)象從 Spark 的內(nèi)部格式序列化和反序列化為 JVM 對(duì)象,包括原始數(shù)據(jù)類(lèi)型。例如,anEncoder[T]將從 Spark 的內(nèi)部 Tungsten 格式轉(zhuǎn)換為Dataset[T].

    Spark 內(nèi)置支持為基本類(lèi)型(例如,字符串、整數(shù)、長(zhǎng)整數(shù))、Scala 案例類(lèi)和 JavaBeans 自動(dòng)生成編碼器。與 Java 和 Kryo 的序列化和反序列化相比,Spark 編碼器的速度要快得多。

    在我們之前的 Java 示例中,我們顯式地創(chuàng)建了一個(gè)編碼器:

    Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);

    然而,對(duì)于 Scala,Spark 會(huì)自動(dòng)為這些高效的轉(zhuǎn)換器生成字節(jié)碼。讓我們來(lái)看看 Spark 內(nèi)部基于 Tungsten 行的格式。

    Spark 的內(nèi)部格式與 Java 對(duì)象格式

    Java 對(duì)象有很大的開(kāi)銷(xiāo)——標(biāo)頭信息、哈希碼、Unicode 信息等。即使是簡(jiǎn)單的 Java 字符串(例如“abcd”)也需要 48 個(gè)字節(jié)的存儲(chǔ)空間,而不是您可能期望的 4 個(gè)字節(jié)。例如,想象一下創(chuàng)建MyClass(Int, String, String)對(duì)象的開(kāi)銷(xiāo)。

    Spark 不是為 Datasets 或 DataFrames 創(chuàng)建基于 JVM 的對(duì)象,而是分配堆外 Java 內(nèi)存來(lái)布置它們的數(shù)據(jù),并使用編碼器將數(shù)據(jù)從內(nèi)存表示轉(zhuǎn)換為 JVM 對(duì)象。例如,圖 6-1顯示了 JVM 對(duì)象如何在MyClass(Int, String, String)內(nèi)部存儲(chǔ)。

    圖 6-1。JVM 對(duì)象存儲(chǔ)在由 Spark 管理的連續(xù)堆外 Java 內(nèi)存中

    當(dāng)數(shù)據(jù)以這種連續(xù)方式存儲(chǔ)并通過(guò)指針?biāo)惴ê推屏吭L(fǎng)問(wèn)時(shí),編碼器可以快速序列化或反序列化該數(shù)據(jù)。這意味著什么?

    序列化和反序列化 (SerDe)

    分布式計(jì)算中的一個(gè)并不新鮮的概念,其中數(shù)據(jù)經(jīng)常通過(guò)網(wǎng)絡(luò)在集群中的計(jì)算機(jī)節(jié)點(diǎn)之間傳輸,序列化和反序列化是發(fā)送方將類(lèi)型化對(duì)象編碼(序列化)為二進(jìn)制表示或格式并解碼的過(guò)程(反序列化)從二進(jìn)制格式到接收器各自的數(shù)據(jù)類(lèi)型對(duì)象。

    例如,如果圖 6-1MyClass中的 JVM 對(duì)象必須在Spark 集群中的節(jié)點(diǎn)之間共享,則發(fā)送方會(huì)將其序列化為字節(jié)數(shù)組,而接收方會(huì)將其反序列化回類(lèi)型為 的 JVM 對(duì)象。MyClass

    JVM 有自己的內(nèi)置 Java 序列化器和反序列化器,但效率低下,因?yàn)?#xff08;正如我們?cè)谏弦还?jié)中看到的)JVM 在堆內(nèi)存中創(chuàng)建的 Java 對(duì)象是臃腫的。因此,該過(guò)程是緩慢的。

    這就是數(shù)據(jù)集編碼器來(lái)救援的地方,原因如下:

    • Spark 的內(nèi)部 Tungsten 二進(jìn)制格式(參見(jiàn)圖6-1和6-2)將對(duì)象存儲(chǔ)在 Java 堆內(nèi)存之外,而且它很緊湊,因此這些對(duì)象占用的空間更少。

    • 編碼器可以通過(guò)使用帶有內(nèi)存地址和偏移量的簡(jiǎn)單指針?biāo)惴ū闅v內(nèi)存來(lái)快速序列化(圖 6-2)。

    • 在接收端,編碼器可以快速將二進(jìn)制表示反序列化為 Spark 的內(nèi)部表示。編碼器不受 JVM 垃圾收集暫停的阻礙。

    圖 6-2。Spark 內(nèi)部基于 Tungsten 行的格式

    然而,正如我們接下來(lái)要討論的那樣,生活中大多數(shù)美好的事物都是有代價(jià)的。

    使用數(shù)據(jù)集的成本

    在第 3 章的“DataFrames 與 Datasets”中,我們概述了使用 Datasets 的一些好處——但這些好處是有代價(jià)的。如上一節(jié)所述,當(dāng)數(shù)據(jù)集被傳遞給高階函數(shù)時(shí),例如,或接受 lambdas 和函數(shù)參數(shù)的函數(shù),從 Spark 的內(nèi)部 Tungsten 格式反序列化到 JVM 對(duì)象會(huì)產(chǎn)生相關(guān)成本。filter()map()flatMap()

    與在 Spark 中引入編碼器之前使用的其他序列化器相比,此成本很小且可以忍受。但是,在更大的數(shù)據(jù)集和許多查詢(xún)中,此成本會(huì)累積并可能影響性能。

    降低成本的策略

    減輕過(guò)度序列化和反序列化的一種策略是在查詢(xún)中使用DSL 表達(dá)式,并避免過(guò)度使用 lambda 作為匿名函數(shù)作為高階函數(shù)的參數(shù)。因?yàn)?lambda 在運(yùn)行時(shí)之前對(duì) Catalyst 優(yōu)化器是匿名且不透明的,所以當(dāng)您使用它們時(shí),它無(wú)法有效地識(shí)別您在做什么(您沒(méi)有告訴 Spark要做什么),因此無(wú)法優(yōu)化您的查詢(xún)(請(qǐng)參閱“Catalyst Optimizer”在第 3 章中)。

    第二種策略是以最小化序列化和反序列化的方式將查詢(xún)鏈接在一起。將查詢(xún)鏈接在一起是 Spark 中的常見(jiàn)做法。

    讓我們用一個(gè)簡(jiǎn)單的例子來(lái)說(shuō)明。假設(shè)我們有一個(gè)類(lèi)型為 的數(shù)據(jù)集Person,其中Person定義為 Scala 案例類(lèi):

    // In Scala Person(id: Integer, firstName: String, middleName: String, lastName: String, gender: String, birthDate: String, ssn: String, salary: String)

    我們想使用函數(shù)式編程向這個(gè)數(shù)據(jù)集發(fā)出一組查詢(xún)。

    讓我們來(lái)看看我們編寫(xiě)查詢(xún)效率低下的情況,以這種方式我們?cè)诓恢挥X(jué)中產(chǎn)生了重復(fù)序列化和反序列化的成本:

    import java.util.Calendar val earliestYear = Calendar.getInstance.get(Calendar.YEAR) - 40personDS// Everyone above 40: lambda-1.filter(x => x.birthDate.split("-")(0).toInt > earliestYear)// Everyone earning more than 80K.filter($"salary" > 80000)// Last name starts with J: lambda-2.filter(x => x.lastName.startsWith("J"))// First name starts with D.filter($"firstName".startsWith("D")).count()

    正如您在圖 6-3中所看到的,每次我們從 lambda 移動(dòng)到 DSL( ) 時(shí),都會(huì)產(chǎn)生序列化和反序列化JVM 對(duì)象filter($"salary" > 8000)的成本。Person

    圖 6-3。使用 lambdas 和 DSL 鏈接查詢(xún)的低效方式

    相比之下,以下查詢(xún)僅使用 DSL,不使用 lambda。因此,它的效率要高得多——整個(gè)組合和鏈?zhǔn)讲樵?xún)不需要序列化/反序列化:

    personDS.filter(year($"birthDate") > earliestYear) // Everyone above 40.filter($"salary" > 80000) // Everyone earning more than 80K.filter($"lastName".startsWith("J")) // Last name starts with J.filter($"firstName".startsWith("D")) // First name starts with D.count()

    概括

    在本章中,我們?cè)敿?xì)介紹了如何在 Java 和 Scala 中使用數(shù)據(jù)集。我們探索了 Spark 如何管理內(nèi)存以將 Dataset 構(gòu)造作為其統(tǒng)一和高級(jí) API 的一部分,并且我們考慮了與使用 Datasets 相關(guān)的一些成本以及如何降低這些成本。我們還向您展示了如何在 Spark 中使用 Java 和 Scala 的函數(shù)式編程結(jié)構(gòu)。

    最后,我們深入了解了編碼器如何從 Spark 的內(nèi)部 Tungsten 二進(jìn)制格式序列化和反序列化為 JVM 對(duì)象。

    總結(jié)

    以上是生活随笔為你收集整理的【Apache Spark 】第 6 章Spark SQL 和数据集的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。