Spark的基本架构
http://ihoge.cn/2018/IntroductionToSpark.html
Spark的基本架構
當單機沒有足夠的能力和資源來執行大量信息的計算(或者低延遲計算),這時就需要一個集群或一組機器將許多機器的資源集中在一起,使我們可以使用全部累積的在一起的計算和存儲資源。現在只有一組機器不夠強大,你需要一個框架來協調他們之間的工作。 Spark是一種工具,可以管理和協調跨計算機集群執行數據任務。
Spark用于執行任務的機器集群可以由Spark的Standalone,YARN或Mesos等集群管理器進行管理。然后,我們向這些集群管理器提交Spark應用程序,這些集群管理器將資源授予我們的應用程序,以便我們完成我們的工作。
1. Spark Application
Spark應用程序由一個驅動程序進程和一組執行程序進程組成。Driver進程運行main()函數,位于集群中的一個節點上,它負責三件事:維護Spark應用程序的相關信息;回應用戶的程序或輸入;分配和安排Executors之間的工作。驅動程序過程是絕對必要的 - 它是Spark應用程序的核心,并在應用程序的生命周期中保留所有相關信息。
Executor負責實際執行Driver分配給他們的工作。這意味著,每個Executor只有兩個任務:執行由驅動程序分配給它的代碼,并將該執行程序的計算狀態報告給驅動程序節點。
群集管理器控制物理機器并為Spark應用程序分配資源。這可以是幾個核心集群管理員之一:Spark的Standalone,YARN或Mesos。這意味著可以同時在群集上運行多個Spark應用程序。
在前面的插圖中,左側是我們的driver,右側是四個executors。在該圖中,我們刪除了群集節點的概念。用戶可以通過配置指定有多少執行者應該落在每個節點上。
- Spark有一些集群管理器,負責調度可用資源。
- 驅動程序進程負責執行執行程序中的驅動程序命令,以完成我們的任務。
2. Spark’s APIs
盡管我們的executor大多會一直運行Spark代碼。但我們仍然可以通過Spark的語言API用多種不同語言運行Spark代碼。大多數情況下,Spark會在每種語言中提供一些核心“concepts”,并將不同語言的代碼譯成運行在機器集群上的Spark代碼。
Spark有兩套基本的API:低級非結構化(Unstructured)API和更高級別的結構化(Structured)API。
3. SparkSession
我們通過驅動程序來控制Spark應用程序。該驅動程序進程將自身作為名為SparkSession并作為唯一的接口API對象向用戶開放。 SparkSession實例是Spark在群集中執行用戶定義操作的方式。 SparkSession和Spark應用程序之間有一對一的對應關系。在Scala和Python中,變量在啟動控制臺時可用作spark。讓我們看下簡單的Scala和/或Python中的SparkSession。
4. Dataframe
DataFrame是最常見的Structured API(結構化API),只是表示有類型的包含行和列的數據表。一個簡單的比喻就是一個帶有命名列的電子表格。其根本區別在于,當電子表格位于一臺計算機上某個特定位置時,Spark DataFrame可以跨越數千臺計算機。將數據放在多臺計算機上的原因無非有兩種:數據太大而無法放在一臺計算機上,或者在一臺計算機上執行計算所需的時間太長。
DataFrame概念并不是Spark獨有的。 R和Python都有相似的概念。但是,Python / R DataFrame(有一些例外)存在于一臺機器上,而不是多臺機器上。這限制了您可以對python和R中給定的DataFrame執行的操作與該特定機器上存在的資源進行對比。但是,由于Spark具有適用于Python和R的Spark’s Language APIs,因此將Pandas(Python)DataFrame轉換為Spark DataFrame和R DataFrame轉換為Spark DataFrame(R)非常容易。
注意
Spark有幾個核心抽象:Datasets, DataFrames, SQL Tables,和彈性分布式數據集(RDD)。這些抽象都表示分布式數據集合,但它們有不同的接口來處理這些數據。最簡單和最有效的是DataFrames,它可以用于所有語言。
5. Partitions
為了允許每個執行者并行執行工作,Spark將數據分解成稱為分區的塊。分區是位于集群中的一臺物理機上的一組行。 DataFrame的分區表示數據在執行過程中如何在整個機器群中物理分布。如果你有一個分區,即使你有數千個執行者,Spark也只會有一個分區。如果有多個分區,但只有一個執行程序Spark仍然只有一個并行性,因為只有一個計算資源。
值得注意的是,使用DataFrames,我們不會(大部分)操作 手動分區(基于個人)。我們只需指定物理分區中數據的高級轉換,并且Spark確定此工作將如何在集群上實際執行。較低級別的API確實存在(通過彈性分布式數據集接口)。
6. Transformations
在Spark中,核心數據結構是不可改變的,這意味著一旦創建它們就不能更改。起初,這可能看起來像一個奇怪的概念,如果你不能改變它,你應該如何使用它?為了“更改”DataFrame,您必須指示Spark如何修改您所需的DataFrame。這些說明被稱為轉換。
轉換操作沒有返回輸出,這是因為我們只指定了一個抽象轉換,并且Spark不會在轉換之前采取行動,直到我們執行一個動作。Transformations是如何使用Spark來表達業務邏輯的核心。Spark有兩種類型的Transformations,一種是窄依賴轉換關系,一種是寬依賴轉換關系。
寬依賴指輸入分區對多輸出分區起作用(多個孩子)。這被稱為shuffle,Spark將在群集之間交換分區。對于窄依賴轉換,Spark將自動執行稱為流水線的操作,這意味著如果我們在DataFrame上指定了多個過濾器,它們將全部在內存中執行。當我們執行shuffle時,Spark會將結果寫入磁盤。
7. Lazy Evaluation
Lazy Evaluation意味著Spark將等到執行計算指令圖的最后時刻。在Spark中,我們不是在表達某些操作時立即修改數據,而是建立起來應用于源數據的轉換計劃。Spark將把原始DataFrame轉換計劃編譯為一個高效的物理計劃,該計劃將在群集中盡可能高效地運行。這為最終用戶帶來了巨大的好處,因為Spark可以優化整個數據流從端到端。這方面的一個例子就是所謂的“predicate pushdown” DataFrames。如果我們構建一個大的Spark作業,但在最后指定了一個過濾器,只需要我們從源數據中獲取一行,則執行此操作的最有效方法就是訪問我們需要的單個記錄。 Spark實際上會通過自動推低濾波器來優化這一點。
8. Actions
轉換使我們能夠建立我們的邏輯計劃。為了觸發計算,我們需要一個動作操作。一個動作指示Spark計算一系列轉換的結果。
在指定我們的操作時,我們開始了一個Spark作業,它運行我們的過濾器轉換(一個窄依賴轉換),然后是一個聚合(一個寬依賴轉換),它在每個分區的基礎上執行計數,然后一個collect將我們的結果帶到各自語言的本地對象。我們可以通過檢查Spark UI(http://localhost:4040)來看到所有這些,Spark UI是一個包含在Spark中的工具,它允許我們監視集群上運行的Spark作業。
9. Dataframe & SQL
Spark SQL是Spark為結構化和半結構化數據處理設計的最受歡迎的模塊之一。 Spark SQL允許用戶使用SQL或可在Java,Scala,Python和R中使用的DataFrame和Dataset API來查詢Spark程序中的structured data。由于DataFrame API提供了一種統一的方法來訪問各種的數據源(包括Hive datasets,Avro,Parquet,ORC,JSON和JDBC),用戶能夠以相同方式連接到任何數據源,并將這些多個數據源連接在一起。 Spark SQL使用Hive meta store為用戶提供了與現有Hive數據,查詢和UDF完全兼容的功能。用戶可以無縫地 在Spark上無需修改即可運行其當前的Hive工作負載。
Spark SQL也可以通過spark-sql shell來訪問,現有的業務工具可以通過標準的JDBC和ODBC接口進行連接。
現在我們通過一個示例并在DataFrame和SQL中進行跟蹤。不管語言如何,以完全相同的方式啟動相同的轉換。您可以在SQL或DataFrames(R,Python,Scala或Java)中表達業務邏輯,并且在實際執行代碼之前,Spark會將該邏輯編譯計劃優化并最終生成最優的物理計劃。 Spark SQL允許您作為用戶將任何DataFrame注冊為表或視圖(臨時表),并使用純SQL查詢它。編寫SQL查詢或編寫DataFrame代碼之間沒有性能差異 都“編譯”到我們在DataFrame代碼中指定的相同底層計劃。
通過一個簡單的方法調用就可以將任何DataFrame制作成表格或視圖。
With SQl
With DataFrame
現在有7個步驟將我們帶回源數據。您可以在這些DataFrame的解釋計劃中看到這一點。以上圖解說明了我們在“代碼”中執行的一系列步驟。真正的執行計劃(解釋中可見的執行計劃)將與下面的執行計劃有所不同,因為在物理執行方面進行了優化,然而,該執行計劃與任何計劃一樣都是起點。這個執行計劃是一個有向無環圖(DAG)的轉換,每個轉換產生一個新的不可變DataFrame,我們在這個DataFrame上調用一個動作來產生一個結果。
1. 第一步是讀取數據。但是Spark實際上并沒有讀取它(Lazy Evaluation)
2. 第二步是我們的分組,在技術上,當我們調用groupBy時,我們最終得到了一個RelationalGroupedDataset,它是DataFrame的一個奇特名稱,該DataFrame具有指定的分組,但需要用戶在可以進一步查詢之前指定聚合。
3. 因此第三步是指定聚合。我們使用總和聚合方法。這需要輸入一列 表達式或簡單的列名稱。 sum方法調用的結果是一個新的dataFrame。你會看到它有一個新的模式,但它知道每個列的類型。(再次強調!)這里沒有執行計算是非常重要的。這只是我們表達的另一種轉換,Spark僅僅能夠跟蹤我們提供的類型信息。
4. 第四步是簡化語言,我們使用withColumnRename給原始列重新定義新名稱。當然,這不會執行計算 - 這只是另一種轉換!
5. 第五步導入一個函數對數據進行排序,即desc函數。從destination_total列中找到的最大值。
6. 第六步,我們將指定一個限制。這只是說明我們只需要五個值。這就像一個過濾器,只是它按位置而不是按值過濾。可以肯定地說,它基本上只是指定了一定大小的DataFrame。
7. 最后一步是我們的行動!現在我們實際上開始收集上面的DataFrame結果的過程,Spark將以我們正在執行的語言返回一個列表或數組。現在我們看下它的解釋計劃。
雖然這個解釋計劃與我們確切的“概念計劃”不符,但所有的部分都在那里。可以看到limit語句以及orderBy(在第一行)。你也可以看到我們的聚合是如何在partial_sum調用中的兩個階段發生的。這是因為數字列表是可交換的,并且Spark可以執行sum()并按分區進行劃分。當然,我們也可以看到我們如何在DataFrame中讀取數據。同時我們也可以將它寫出到Spark支持的任何數據源中。例如,假設我們想要將這些信息存儲在PostgreSQL等數據庫中,或者將它們寫入另一個文件。
本文永久地址,轉載注明出處!
http://ihoge.cn/2018/IntroductionToSpark.html
總結
以上是生活随笔為你收集整理的Spark的基本架构的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 免安装免配置 还免费的Spark 集群
- 下一篇: Spark ML - 协同过滤