分布式离线计算—Spark—SparkStreaming
原文作者:阿里中間件
原文地址:一文讀懂 Spark 和 Spark Streaming?
目錄
MapReduce 的問題所在
Spark 與 RDD 模型
流計算框架:Spark Streaming
流計算與 SQL:Spark Structured Streaming
系統(tǒng)架構(gòu)
總結(jié)
前言
Apache Spark 是當(dāng)今最流行的開源大數(shù)據(jù)處理框架。和人們耳熟能詳?shù)?MapReduce 一樣,Spark 用于進行分布式、大規(guī)模的數(shù)據(jù)處理,但 Spark 作為 MapReduce 的接任者,提供了更高級的編程接口、更高的性能。除此之外,Spark 不僅能進行常規(guī)的批處理計算,還提供了流式計算支持。Apache Spark 誕生于大名鼎鼎的 AMPLab(這里還誕生過 Mesos 和 Alluxio),從創(chuàng)立之初就帶有濃厚的學(xué)術(shù)氣質(zhì),其設(shè)計目標(biāo)是為各種大數(shù)據(jù)處理需求提供一個統(tǒng)一的技術(shù)棧。如今 Spark 背后的商業(yè)公司 Databricks 創(chuàng)始人也是來自 AMPLab 的博士畢業(yè)生。Spark 本身使用 Scala 語言編寫,Scala 是一門融合了面向?qū)ο笈c函數(shù)式的“雙范式”語言,運行在 JVM 之上。Spark 大量使用了它的函數(shù)式、即時代碼生成等特性。Spark 目前提供了 Java、Scala、Python、R 四種語言的 API,前兩者因為同樣運行在 JVM 上可以達(dá)到更原生的支持。
MapReduce 的問題所在
Hadoop 是大數(shù)據(jù)處理領(lǐng)域的開創(chuàng)者。嚴(yán)格來說,Hadoop 不只是一個軟件,而是一整套生態(tài)系統(tǒng),例如 MapReduce 負(fù)責(zé)進行分布式計算,而 HDFS 負(fù)責(zé)存儲大量文件。MapReduce 模型的誕生是大數(shù)據(jù)處理從無到有的飛躍。但隨著技術(shù)的進步,對大數(shù)據(jù)處理的需求也變得越來越復(fù)雜,MapReduce 的問題也日漸凸顯。通常,我們將 MapReduce 的輸入和輸出數(shù)據(jù)保留在 HDFS 上,很多時候,復(fù)雜的 ETL、數(shù)據(jù)清洗等工作無法用一次 MapReduce 完成,所以需要將多個 MapReduce 過程連接起來:
▲ 上圖中只有兩個 MapReduce 串聯(lián),實際上可能有幾十個甚至更多,依賴關(guān)系也更復(fù)雜。這種方式下,每次中間結(jié)果都要寫入 HDFS 落盤保存,代價很大(別忘了,HDFS 的每份數(shù)據(jù)都需要冗余若干份拷貝)。另外,由于本質(zhì)上是多次 MapReduce 任務(wù),調(diào)度也比較麻煩,實時性無從談起。
Spark 與 RDD 模型
針對上面的問題,如果能把中間結(jié)果保存在內(nèi)存里,豈不是快的多?之所以不能這么做,最大的障礙是:分布式系統(tǒng)必須能容忍一定的故障,所謂 fault-tolerance。如果只是放在內(nèi)存中,一旦某個計算節(jié)點宕機,其他節(jié)點無法恢復(fù)出丟失的數(shù)據(jù),只能重啟整個計算任務(wù),這對于動輒成百上千節(jié)點的集群來說是不可接受的。
一般來說,想做到 fault-tolerance 只有兩個方案:要么存儲到外部(例如 HDFS),要么拷貝到多個副本。Spark 大膽地提出了第三種——重算一遍。但是之所以能做到這一點,是依賴于一個額外的假設(shè):所有計算過程都是確定性的(deterministic)。Spark 借鑒了函數(shù)式編程思想,提出了 RDD(Resilient Distributed Datasets),譯作“彈性分布式數(shù)據(jù)集”。
RDD 是一個只讀的、分區(qū)的(partitioned)數(shù)據(jù)集合。RDD 要么來源于不可變的外部文件(例如 HDFS 上的文件),要么由確定的算子由其他 RDD 計算得到。RDD 通過算子連接構(gòu)成有向無環(huán)圖(DAG),上圖演示了一個簡單的例子,其中節(jié)點對應(yīng) RDD,邊對應(yīng)算子。回到剛剛的問題,RDD 如何做到 fault-tolerance?很簡單,RDD 中的每個分區(qū)都能被確定性的計算出來,所以一旦某個分區(qū)丟失了,另一個計算節(jié)點可以從它的前繼節(jié)點出發(fā)、用同樣的計算過程重算一次,即可得到完全一樣的 RDD 分區(qū)。這個過程可以遞歸的進行下去。
▲ 上圖演示了 RDD 分區(qū)的恢復(fù)。為了簡潔并沒有畫出分區(qū),實際上恢復(fù)是以分區(qū)為單位的。Spark 的編程接口和 Java 8 的 Stream 很相似:RDD 作為數(shù)據(jù),在多種算子間變換,構(gòu)成對執(zhí)行計劃 DAG 的描述。最后,一旦遇到類似?collect()這樣的輸出命令,執(zhí)行計劃會被發(fā)往 Spark 集群、開始計算。不難發(fā)現(xiàn),算子分成兩類:
- map()、filter()、join() 等算子稱為 Transformation,它們輸入一個或多個 RDD,輸出一個 RDD。
- collect()、count()、save() 等算子稱為 Action,它們通常是將數(shù)據(jù)收集起來返回;
像之前提到的,RDD 的數(shù)據(jù)由多個分區(qū)(partition)構(gòu)成,這些分區(qū)可以分布在集群的各個機器上,這也就是 RDD 中 “distributed” 的含義。熟悉 DBMS 的同學(xué)可以把 RDD 理解為邏輯執(zhí)行計劃,partition 理解為物理執(zhí)行計劃。
此外,RDD 還包含它的每個分區(qū)的依賴分區(qū)(dependency),以及一個函數(shù)指出如何計算出本分區(qū)的數(shù)據(jù)。Spark 的設(shè)計者發(fā)現(xiàn),依賴關(guān)系依據(jù)執(zhí)行方式的不同可以很自然地分成兩種:窄依賴(Narrow Dependency)和寬依賴(Wide Dependency),舉例來說:
- map()、filter() 等算子構(gòu)成窄依賴:生產(chǎn)的每個分區(qū)只依賴父 RDD 中的一個分區(qū)。
- groupByKey() 等算子構(gòu)成寬依賴:生成的每個分區(qū)依賴父 RDD 中的多個分區(qū)(往往是全部分區(qū))。
在執(zhí)行時,窄依賴可以很容易的按流水線(pipeline)的方式計算:對于每個分區(qū)從前到后依次代入各個算子即可。然而,寬依賴需要等待前繼 RDD 中所有分區(qū)計算完成;換句話說,寬依賴就像一個柵欄(barrier)會阻塞到之前的所有計算完成。整個計算過程被寬依賴分割成多個階段(stage),如上右圖所示。
了解 MapReduce 的同學(xué)可能已經(jīng)發(fā)現(xiàn),寬依賴本質(zhì)上就是一個 MapReduce 過程。但是相比 MapReduce 自己寫 Map 和 Reduce 函數(shù)的編程接口,Spark 的接口要容易的多;并且在 Spark 中,多個階段的 MapReduce 只需要構(gòu)造一個 DAG 即可。
聲明式接口:Spark SQL
Spark 誕生后,大幅簡化了 MapReduce 編程模型,但人們并不滿足于此。我們知道,與命令式(imperative)編程相對的是聲明式(declarative)編程,前者需要告訴程序怎樣得到我需要的結(jié)果,后者則是告訴程序我需要的結(jié)果是什么。舉例而言:你想知道,各個部門?<dept_id, dept_name>中性別為女?'female'的員工分別有多少?
命令式編程中
你需要編寫一個程序。下面給出了一種偽代碼實現(xiàn):
employees = db.getAllEmployees() countByDept = dict() // 統(tǒng)計各部門女生人數(shù) (dept_id -> count) for employee in employees: if (employee.gender == 'female') countByDept[employee.dept_id] += 1 results = list() // 加上 dept.name 列 depts = db.getAllDepartments() for dept in depts: if (countByDept containsKey dept.id) results.add(row(dept.id, dept.name, countByDept[dept.id])) return results;聲明式編程:
你只要用關(guān)系代數(shù)的運算表達(dá)出結(jié)果:
employees.join(dept, employees.deptId == dept.id) .where(employees.gender == 'female') .groupBy(dept.id, dept.name) .agg()
等價地,如果你更熟悉 SQL,也可以寫成這樣:
SELECTdept.id,dept.name,COUNT(*)FROMemployees?JOINdept?ONemployees.dept_id?==dept.idWHEREemployees.gender?='female'GROUPBYdept.id,dept.name
顯然,聲明式的要簡潔的多!但聲明式編程依賴于執(zhí)行者產(chǎn)生真正的程序代碼,所以除了上面這段程序,還需要把數(shù)據(jù)模型(即 schema)一并告知執(zhí)行者。聲明式編程最廣為人知的形式就是 SQL。Spark SQL 就是這樣一個基于 SQL 的聲明式編程接口。你可以將它看作在 Spark 之上的一層封裝,在 RDD 計算模型的基礎(chǔ)上,提供了 DataFrame API 以及一個內(nèi)置的 SQL 執(zhí)行計劃優(yōu)化器 Catalyst。
▲ 上圖黃色部分是 Spark SQL 中新增的部分。DataFrame 就像數(shù)據(jù)庫中的表,除了數(shù)據(jù)之外它還保存了數(shù)據(jù)的 schema 信息。計算中,schema 信息也會經(jīng)過算子進行相應(yīng)的變換。DataFrame 的數(shù)據(jù)是行(row)對象組成的 RDD,對 DataFrame 的操作最終會變成對底層 RDD 的操作。
Catalyst 是一個內(nèi)置的 SQL 優(yōu)化器,負(fù)責(zé)把用戶輸入的 SQL 轉(zhuǎn)化成執(zhí)行計劃。Catelyst 強大之處是它利用了 Scala 提供的代碼生成(codegen)機制,物理執(zhí)行計劃經(jīng)過編譯,產(chǎn)出的執(zhí)行代碼效率很高,和直接操作 RDD 的命令式代碼幾乎沒有分別。
▲ 上圖是 Catalyst 的工作流程,與大多數(shù) SQL 優(yōu)化器一樣是一個 Cost-Based Optimizer (CBO),但最后使用代碼生成(codegen)轉(zhuǎn)化成直接對 RDD 的操作。流計算框架:Spark Streaming
以往,批處理和流計算被看作大數(shù)據(jù)系統(tǒng)的兩個方面。我們常常能看到這樣的架構(gòu)——以 Kafka、Storm 為代表的流計算框架用于實時計算,而 Spark 或 MapReduce 則負(fù)責(zé)每天、每小時的數(shù)據(jù)批處理。在 ETL 等場合,這樣的設(shè)計常常導(dǎo)致同樣的計算邏輯被實現(xiàn)兩次,耗費人力不說,保證一致性也是個問題。
Spark Streaming 正是誕生于此類需求。傳統(tǒng)的流計算框架大多注重于低延遲,采用了持續(xù)的(continuous)算子模型;而 Spark Streaming 基于 Spark,另辟蹊徑提出了?D-Stream(Discretized Streams)方案:將流數(shù)據(jù)切成很小的批(micro-batch),用一系列的短暫、無狀態(tài)、確定性的批處理實現(xiàn)流處理。
Spark Streaming 的做法在流計算框架中很有創(chuàng)新性,它雖然犧牲了低延遲(一般流計算能做到 100ms 級別,Spark Streaming 延遲一般為 1s 左右),但是帶來了三個誘人的優(yōu)勢:
- 更高的吞吐量(大約是 Storm 的 2-5 倍)
- 更快速的失敗恢復(fù)(通常只要 1-2s),因此對于 straggler(性能拖后腿的節(jié)點)直接殺掉即可
- 開發(fā)者只需要維護一套 ETL 邏輯即可同時用于批處理和流計算
你可能會困惑,流計算中的狀態(tài)一直是個難題。但我們剛剛提到 D-Stream 方案是無狀態(tài)的,那諸如 word count 之類的問題,怎么做到保持 count 算子的狀態(tài)呢?答案是通過 RDD:將前一個時間步的 RDD 作為當(dāng)前時間步的 RDD 的前繼節(jié)點,就能造成狀態(tài)不斷更替的效果。實際上,新的狀態(tài) RDD 總是不斷生成,而舊的 RDD 并不會被“替代”,而是作為新 RDD 的前繼依賴。對于底層的 Spark 框架來說,并沒有時間步的概念,有的只是不斷擴張的 DAG 圖和新的 RDD 節(jié)點。
▲ 上圖是流式計算 word count 的例子,count 結(jié)果在不同時間步中不斷累積。那么另一個問題也隨之而來:隨著時間的推進,上圖中的狀態(tài) RDD?counts會越來越多,他的祖先(lineage)變得越來越長,極端情況下,恢復(fù)過程可能溯源到很久之前。這是不可接受的!因此,Spark Streming 會定期地對狀態(tài) RDD 做 checkpoint,將其持久化到 HDFS 等存儲中,這被稱為 lineage cut,在它之前更早的 RDD 就可以沒有顧慮地清理掉了。
關(guān)于流行的幾個開源流計算框架的對比,可以參考文章?Comparison of Apache Stream Processing Frameworks。
流計算與 SQL:Spark Structured Streaming
Spark 通過 Spark Streaming 擁有了流計算能力,那 Spark SQL 是否也能具有類似的流處理能力呢?答案是肯定的,只要將數(shù)據(jù)流建模成一張不斷增長、沒有邊界的表,在這樣的語義之下,很多 SQL 操作等就能直接應(yīng)用在流數(shù)據(jù)上。
出人意料的是,Spark Structured Streaming 的流式計算引擎并沒有復(fù)用 Spark Streaming,而是在 Spark SQL 上設(shè)計了新的一套引擎。因此,從 Spark SQL 遷移到 Spark Structured Streaming 十分容易,但從 Spark Streaming 遷移過來就要困難得多。
很自然的,基于這樣的模型,Spark SQL 中的大部分接口、實現(xiàn)都得以在 Spark Structured Streaming 中直接復(fù)用。將用戶的 SQL 執(zhí)行計劃轉(zhuǎn)化成流計算執(zhí)行計劃的過程被稱為增量化(incrementalize),這一步是由 Spark 框架自動完成的。對于用戶來說只要知道:每次計算的輸入是某一小段時間的流數(shù)據(jù),而輸出是對應(yīng)數(shù)據(jù)產(chǎn)生的計算結(jié)果。
▲ 左圖是 Spark Structured Streaming 模型示意圖;右圖展示了同一個任務(wù)的批處理、流計算版本,可以看到,除了輸入輸出不同,內(nèi)部計算過程完全相同。與 Spark SQL 相比,流式 SQL 計算還有兩個額外的特性,分別是窗口(window)和水位(watermark)。窗口(window)是對過去某段時間的定義。批處理中,查詢通常是全量的(例如:總用戶量是多少);而流計算中,我們通常關(guān)心近期一段時間的數(shù)據(jù)(例如:最近24小時新增的用戶量是多少)。用戶通過選用合適的窗口來獲得自己所需的計算結(jié)果,常見的窗口有滑動窗口(Sliding Window)、滾動窗口(Tumbling Window)等。水位(watermark)用來丟棄過早的數(shù)據(jù)。在流計算中,上游的輸入事件可能存在不確定的延遲,而流計算系統(tǒng)的內(nèi)存是有限的、只能保存有限的狀態(tài),一定時間之后必須丟棄歷史數(shù)據(jù)。以雙流 A JOIN B 為例,假設(shè)窗口為 1 小時,那么 A 中比當(dāng)前時間減 1 小時更早的數(shù)據(jù)(行)會被丟棄;如果 B 中出現(xiàn) 1 小時前的事件,因為無法處理只能忽略。
▲ 上圖為水位的示意圖,“遲到”太久的數(shù)據(jù)(行)由于已經(jīng)低于當(dāng)前水位無法處理,將被忽略。水位和窗口的概念都是因時間而來。在其他流計算系統(tǒng)中,也存在相同或類似的概念。
關(guān)于 SQL 的流計算模型,常常被拿來對比的還有另一個流計算框架?Apache Flink。與 Spark 相比,它們的實現(xiàn)思路有很大不同,但在模型上是很相似的。
系統(tǒng)架構(gòu)
Spark 中有三個角色:Driver, Worker 和 Cluster Manager。
驅(qū)動程序(Driver)即用戶編寫的程序,對應(yīng)一個?SparkContext,負(fù)責(zé)任務(wù)的構(gòu)造、調(diào)度、故障恢復(fù)等。驅(qū)動程序可以直接運行在客戶端,例如用戶的應(yīng)用程序中;也可以托管在 Master 上,這被稱為集群模式(cluster mode),通常用于流計算等長期任務(wù)。
Cluster Manager顧名思義負(fù)責(zé)集群的資源分配,Spark 自帶的 Spark Master 支持任務(wù)的資源分配,并包含一個 Web UI 用來監(jiān)控任務(wù)運行狀況。多個 Master 可以構(gòu)成一主多備,通過 ZooKeeper 進行協(xié)調(diào)和故障恢復(fù)。通常 Spark 集群使用 Spark Master 即可,但如果用戶的集群中不僅有 Spark 框架、還要承擔(dān)其他任務(wù),官方推薦使用 Mesos 作為集群調(diào)度器。
Worker節(jié)點負(fù)責(zé)執(zhí)行計算任務(wù),上面保存了 RDD 等數(shù)據(jù)。
總結(jié)
Spark 是一個同時支持批處理和流計算的分布式計算系統(tǒng)。Spark 的所有計算均構(gòu)建于 RDD 之上,RDD 通過算子連接形成 DAG 的執(zhí)行計劃,RDD 的確定性及不可變性是 Spark 實現(xiàn)故障恢復(fù)的基礎(chǔ)。Spark Streaming 的 D-Stream 本質(zhì)上也是將輸入數(shù)據(jù)分成一個個 micro-batch 的 RDD。
Spark SQL 是在 RDD 之上的一層封裝,相比原始 RDD,DataFrame API 支持?jǐn)?shù)據(jù)表的 schema 信息,從而可以執(zhí)行 SQL 關(guān)系型查詢,大幅降低了開發(fā)成本。Spark Structured Streaming 是 Spark SQL 的流計算版本,它將輸入的數(shù)據(jù)流看作不斷追加的數(shù)據(jù)行。
總結(jié)
以上是生活随笔為你收集整理的分布式离线计算—Spark—SparkStreaming的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分布式实时计算—Spark—Spark
- 下一篇: 常用框架概览