日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming 实现思路与模块概述

發(fā)布時間:2023/12/31 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming 实现思路与模块概述 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Spark Streaming 實現(xiàn)思路與模塊概述

[酷玩 Spark] Spark Streaming 源碼解析系列?,返回目錄請?猛戳這里

「騰訊·廣點通」技術(shù)團隊榮譽出品

本文內(nèi)容適用范圍:

  • 2016.01.04 update, Spark 1.6 全系列 √ (1.6.0)
  • 2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2)
  • 2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1)
  • 2015.04.17 update, Spark 1.3 全系列 √ (1.3.0, 1.3.1)?

一、基于 Spark 做 Spark Streaming 的思路

Spark Streaming 與 Spark Core 的關(guān)系可以用下面的經(jīng)典部件圖來表述:

在本節(jié),我們先探討一下基于 Spark Core 的 RDD API,如何對 streaming data 進行處理。理解下面描述的這個思路非常重要,因為基于這個思路詳細展開后,就能夠充分理解整個 Spark Streaming 的模塊劃分和代碼邏輯。

第一步,假設我們有一小塊數(shù)據(jù),那么通過 RDD API,我們能夠構(gòu)造出一個進行數(shù)據(jù)處理的 RDD DAG(如下圖所示)。

第二步,我們對連續(xù)的 streaming data 進行切片處理 —— 比如將最近 200ms 時間的 event 積攢一下 —— 每個切片就是一個 batch,然后使用第一步中的 RDD DAG 對這個 batch 的數(shù)據(jù)進行處理。

注意: 這里我們使用的是 batch 的概念 —— 其實 200ms 在其它同類系統(tǒng)中通常叫做 mini-batch,不過既然 Spark Streaming 官方的叫法就是 batch,我們這里就用 batch 表達 mini-batch 的意思了 :)

所以,針對連續(xù)不斷的 streaming data 進行多次切片,就會形成多個 batch,也就對應出來多個 RDD DAG(每個 RDD DAG 針對一個 batch 的數(shù)據(jù))。如此一來,這多個 RDD DAG 之間相互同構(gòu),卻又是不同的實例。我們用下圖來表示這個關(guān)系:

所以,我們將需要:

  • (1) 一個靜態(tài)的 RDD DAG 的模板,來表示處理邏輯;

  • (2) 一個動態(tài)的工作控制器,將連續(xù)的 streaming data 切分數(shù)據(jù)片段,并按照模板復制出新的 RDD DAG 的實例,對數(shù)據(jù)片段進行處理。

第三步,我們回過頭來看 streaming data 本身的產(chǎn)生。Hadoop MapReduce, Spark RDD API 進行批處理時,一般默認數(shù)據(jù)已經(jīng)在 HDFS, HBase 或其它存儲上。而 streaming data —— 比如 twitter 流 —— 又有可能是在系統(tǒng)外實時產(chǎn)生的,就需要能夠?qū)⑦@些數(shù)據(jù)導入到 Spark Streaming 系統(tǒng)里,就像 Apache Storm 的 Spout,Apache S4 的 Adapter 能夠把數(shù)據(jù)導入系統(tǒng)里的作用是一致的。所以,我們將需要:

  • (3) 原始數(shù)據(jù)的產(chǎn)生和導入。

第四步,我們考慮,有了以上 (a)(b)(c) 3 部分,就可以順利用 RDD API 處理 streaming data 了嗎?其實相對于 batch job 通常幾個小時能夠跑完來講,streaming job 的運行時間是 +∞(正無窮大)的,所以我們還將需要:

  • (4) 對長時運行任務的保障,包括輸入數(shù)據(jù)的失效后的重構(gòu),處理任務的失敗后的重調(diào)。

至此,streaming data 的特點決定了,如果我們想基于 Spark Core 進行 streaming data 的處理,還需要在 Spark Core 的框架上解決剛才列出的 (1)(2)(3)(4) 這四點問題:

二、Spark Streaming 的整體模塊劃分

根據(jù) Spark Streaming 解決這 4 個問題的不同 focus,可以將 Spark Streaming 劃分為四個大的模塊:

  • 模塊 1:DAG 靜態(tài)定義
  • 模塊 2:Job 動態(tài)生成
  • 模塊 3:數(shù)據(jù)產(chǎn)生與導入
  • 模塊 4:長時容錯

其中每個模塊涉及到的主要的類,示意如下:

這里先不用糾結(jié)每個類的具體用途,我們將在本文中簡述,并在本系列的后續(xù)文章里對每個模塊逐一詳述。

2.1 模塊 1:DAG 靜態(tài)定義

通過前面的描述我們知道,應該首先對計算邏輯描述為一個 RDD DAG 的“模板”,在后面 Job 動態(tài)生成的時候,針對每個 batch,Spark Streaming 都將根據(jù)這個“模板”生成一個 RDD DAG 的實例。

DStream 和 DStreamGraph

其實在 Spark Streaming 里,這個 RDD “模板”對應的具體的類是?DStream,RDD DAG “模板”對應的具體類是DStreamGraph。而?RDD?本身也有很多子類,幾乎每個子類都有一個對應的?DStream,如?UnionRDD?的對應是UnionDStream。RDD?通過?transformation?連接成 RDD DAG(但 RDD DAG 在 Spark Core 里沒有對應的具體類),DStream?也通過?transformation?連接成?DStreamGraph。

DStream 的全限定名是:org.apache.spark.streaming.dstream.DStream DStreamGraph 的全限定名是:org.apache.spark.streaming.DStreamGraph

DStream 和 RDD 的關(guān)系

既然?DStream?是?RDD?的模板,而且?DStream?和?RDD?具有相同的?transformation?操作,比如 map(), filter(), reduce() ……等等(正是這些相同的?transformation?使得?DStreamGraph?能夠忠實記錄 RDD DAG 的計算邏輯),那?RDD?和?DStream?有什么不一樣嗎?

還真不一樣。

比如,DStream?維護了對每個產(chǎn)出的?RDD?實例的指針。比如下圖里,DStream A?在 3 個 batch 里分別實例化了 3 個?RDD,分別是?a[1],?a[2],?a[3],那么?DStream A?就保留了一個?batch → 所產(chǎn)出的 RDD?的哈希表,即包含?batch 1 → a[1],?batch 2 → a[2],?batch 3 → a[3]?這 3 項。

另外,能夠進行流量控制的?DStream?子類,如?ReceiverInputDStream,還會保存關(guān)于歷次 batch 的源頭數(shù)據(jù)條數(shù)、歷次 batch 計算花費的時間等數(shù)值,用來實時計算準確的流量控制信息,這些都是記在?DStream?里的,而?RDD a[1]?等則不會保存這些信息。

我們在考慮的時候,可以認為,RDD?加上 batch 維度就是?DStream,DStream?去掉 batch 維度就是?RDD?—— 就像?RDD = DStream at batch T

不過這里需要特別說明的是,在DStreamGraph的圖里,DStream(即數(shù)據(jù))是頂點,DStream之間的 transformation(即計算)是邊,這與 Apache Storm 等是相反的。

在 Apache Storm 的 topology 里,頂點是計算,邊是 stream(連續(xù)的 tuple),即數(shù)據(jù)。這一點也是比較熟悉 Storm 的同學剛開始一下子不太理解 DStream 的原因--我們再重復一遍,DStream 在有向圖里是頂點,是數(shù)據(jù)本身,而不是邊。

2.2 模塊 2:Job 動態(tài)生成

現(xiàn)在有了?DStreamGraph?和?DStream,也就是靜態(tài)定義了的計算邏輯,下面我們來看 Spark Streaming 是如何將其動態(tài)調(diào)度的。

在 Spark Streaming 程序的入口,我們都會定義一個 batchDuration,就是需要每隔多長時間就比照靜態(tài)的?DStreamGraph?來動態(tài)生成一個 RDD DAG 實例。在 Spark Streaming 里,總體負責動態(tài)作業(yè)調(diào)度的具體類是?JobScheduler,在 Spark Streaming 程序開始運行的時候,會生成一個?JobScheduler?的實例,并被 start() 運行起來。

JobScheduler?有兩個非常重要的成員:JobGenerator?和?ReceiverTracker。JobScheduler?將每個 batch 的 RDD DAG 具體生成工作委托給?JobGenerator,而將源頭輸入數(shù)據(jù)的記錄工作委托給?ReceiverTracker。

JobScheduler 的全限定名是:org.apache.spark.streaming.scheduler.JobScheduler JobGenerator 的全限定名是:org.apache.spark.streaming.scheduler.JobGenerator ReceiverTracker 的全限定名是:org.apache.spark.streaming.scheduler.ReceiverTracker

JobGenerator?維護了一個定時器,周期就是我們剛剛提到的 batchDuration,定時為每個 batch 生成 RDD DAG 的實例。具體的,每次 RDD DAG 實際生成包含 5 個步驟:

  • (1)?要求?ReceiverTracker?將目前已收到的數(shù)據(jù)進行一次 allocate,即將上次 batch 切分后的數(shù)據(jù)切分到到本次新的 batch 里
  • (2)?要求?DStreamGraph?復制出一套新的 RDD DAG 的實例,具體過程是:DStreamGraph?將要求圖里的尾?DStream?節(jié)點生成具體的 RDD 實例,并遞歸的調(diào)用尾?DStream?的上游?DStream?節(jié)點……以此遍歷整個?DStreamGraph,遍歷結(jié)束也就正好生成了 RDD DAG 的實例
  • (3)?獲取第 1 步?ReceiverTracker?分配到本 batch 的源頭數(shù)據(jù)的 meta 信息
  • (4) 將第 2 步生成的本 batch 的 RDD DAG,和第 3 步獲取到的 meta 信息,一同提交給?JobScheduler?異步執(zhí)行
  • (5) 只要提交結(jié)束(不管是否已開始異步執(zhí)行),就馬上對整個系統(tǒng)的當前運行狀態(tài)做一個 checkpoint

上述 5 個步驟的調(diào)用關(guān)系圖如下:

2.3 模塊 3:數(shù)據(jù)產(chǎn)生與導入

下面我們看 Spark Streaming 解決第三個問題的模塊分析,即數(shù)據(jù)的產(chǎn)生與導入。

DStream?有一個重要而特殊的子類?ReceiverInputDStream:它除了需要像其它?DStream?那樣在某個 batch 里實例化?RDD?以外,還需要額外的?Receiver?為這個?RDD?生產(chǎn)數(shù)據(jù)!

具體的,Spark Streaming 在程序剛開始運行時:

  • (1) 由?Receiver?的總指揮?ReceiverTracker?分發(fā)多個 job(每個 job 有 1 個 task),到多個 executor 上分別啟動ReceiverSupervisor?實例;

  • (2) 每個?ReceiverSupervisor?啟動后將馬上生成一個用戶提供的?Receiver?實現(xiàn)的實例 —— 該?Receiver?實現(xiàn)可以持續(xù)產(chǎn)生或者持續(xù)接收系統(tǒng)外數(shù)據(jù),比如?TwitterReceiver?可以實時爬取 twitter 數(shù)據(jù) —— 并在?Receiver?實例生成后調(diào)用Receiver.onStart()。

ReceiverSupervisor 的全限定名是:org.apache.spark.streaming.receiver.ReceiverSupervisor Receiver 的全限定名是:org.apache.spark.streaming.receiver.Receiver

(1)(2) 的過程由上圖所示,這時?Receiver?啟動工作已運行完畢。

接下來?ReceiverSupervisor?將在 executor 端作為的主要角色,并且:

  • (3)?Receiver?在?onStart()?啟動后,就將持續(xù)不斷地接收外界數(shù)據(jù),并持續(xù)交給?ReceiverSupervisor?進行數(shù)據(jù)轉(zhuǎn)儲;

  • (4)?ReceiverSupervisor?持續(xù)不斷地接收到?Receiver?轉(zhuǎn)來的數(shù)據(jù):

    • 如果數(shù)據(jù)很細小,就需要?BlockGenerator?攢多條數(shù)據(jù)成一塊(4a)、然后再成塊存儲(4b 或 4c)
    • 反之就不用攢,直接成塊存儲(4b 或 4c)

    • 這里 Spark Streaming 目前支持兩種成塊存儲方式,一種是由?blockManagerskManagerBasedBlockHandler?直接存到 executor 的內(nèi)存或硬盤,另一種由?WriteAheadLogBasedBlockHandler?是同時寫 WAL(4c) 和 executor 的內(nèi)存或硬盤

  • (5) 每次成塊在 executor 存儲完畢后,ReceiverSupervisor?就會及時上報塊數(shù)據(jù)的 meta 信息給 driver 端的ReceiverTracker;這里的 meta 信息包括數(shù)據(jù)的標識 id,數(shù)據(jù)的位置,數(shù)據(jù)的條數(shù),數(shù)據(jù)的大小等信息。

  • (6)?ReceiverTracker?再將收到的塊數(shù)據(jù) meta 信息直接轉(zhuǎn)給自己的成員?ReceivedBlockTracker,由ReceivedBlockTracker?專門管理收到的塊數(shù)據(jù) meta 信息。

BlockGenerator 的全限定名是:org.apache.spark.streaming.receiver.BlockGenerator BlockManagerBasedBlockHandler 的全限定名是:org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler WriteAheadLogBasedBlockHandler 的全限定名是:org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler ReceivedBlockTracker 的全限定名是:org.apache.spark.streaming.scheduler.ReceivedBlockTracker ReceiverInputDStream 的全限定名是:org.apache.spark.streaming.dstream.ReceiverInputDStream

這里 (3)(4)(5)(6) 的過程是一直持續(xù)不斷地發(fā)生的,我們也將其在上圖里標識出來。

后續(xù)在 driver 端,就由?ReceiverInputDStream?在每個 batch 去檢查?ReceiverTracker?收到的塊數(shù)據(jù) meta 信息,界定哪些新數(shù)據(jù)需要在本 batch 內(nèi)處理,然后生成相應的?RDD?實例去處理這些塊數(shù)據(jù),這個過程在模塊 1:DAG 靜態(tài)定義?模塊2:Job 動態(tài)生成?里描述過了。

2.4 模塊 4:長時容錯

以上我們簡述完成 Spark Streamimg 基于 Spark Core 所新增功能的 3 個模塊,接下來我們看一看第 4 個模塊將如何保障 Spark Streaming 的長時運行 —— 也就是,如何與前 3 個模塊結(jié)合,保障前 3 個模塊的長時運行。

通過前 3 個模塊的關(guān)鍵類的分析,我們可以知道,保障模塊 1 和 2 需要在 driver 端完成,保障模塊 3 需要在 executor 端和 driver 端完成。

executor 端長時容錯

先看 executor 端。

在 executor 端,ReceiverSupervisor?和?Receiver?失效后直接重啟就 OK 了,關(guān)聯(lián)是保障收到的塊數(shù)據(jù)的安全。保障了源頭塊數(shù)據(jù),就能夠保障 RDD DAG (Spark Core 的 lineage)重做。

Spark Streaming 對源頭塊數(shù)據(jù)的保障,分為 4 個層次,全面、相互補充,又可根據(jù)不同場景靈活設置:

  • (1) 熱備:熱備是指在存儲塊數(shù)據(jù)時,將其存儲到本 executor、并同時 replicate 到另外一個 executor 上去。這樣在一個 replica 失效后,可以立刻無感知切換到另一份 replica 進行計算。實現(xiàn)方式是,在實現(xiàn)自己的 Receiver 時,即指定一下StorageLevel?為?MEMORY_ONLY_2?或?MEMORY_AND_DISK_2?就可以了。

// 1.5.2 update 這已經(jīng)是默認了。

  • (2) 冷備:冷備是每次存儲塊數(shù)據(jù)前,先把塊數(shù)據(jù)作為 log 寫出到?WriteAheadLog?里,再存儲到本 executor。executor 失效時,就由另外的 executor 去讀 WAL,再重做 log 來恢復塊數(shù)據(jù)。WAL 通常寫到可靠存儲如 HDFS 上,所以恢復時可能需要一段 recover time。

  • (3) 重放:如果上游支持重放,比如 Apache Kafka,那么就可以選擇不用熱備或者冷備來另外存儲數(shù)據(jù)了,而是在失效時換一個 executor 進行數(shù)據(jù)重放即可。

  • (4) 忽略:最后,如果應用的實時性需求大于準確性,那么一塊數(shù)據(jù)丟失后我們也可以選擇忽略、不恢復失效的源頭數(shù)據(jù)。

我們用一個表格來總結(jié)一下:

?圖示優(yōu)點缺點
(1) 熱備無 recover time需要占用雙倍資源
(2) 冷備十分可靠存在 recover time
(3) 重放不占用額外資源存在 recover time
(4) 忽略無 recover time準確性有損失

driver 端長時容錯

前面我們講過,塊數(shù)據(jù)的 meta 信息上報到 ReceiverTracker,然后交給?ReceivedBlockTracker?做具體的管理。ReceivedBlockTracker?也采用 WAL 冷備方式進行備份,在 driver 失效后,由新的?ReceivedBlockTracker?讀取 WAL 并恢復 block 的 meta 信息。

另外,需要定時對?DStreamGraph?和?JobScheduler?做?Checkpoint,來記錄整個?DStreamGraph?的變化、和每個 batch 的 job 的完成情況。

注意到這里采用的是完整 checkpoint 的方式,和之前的 WAL 的方式都不一樣。Checkpoint?通常也是落地到可靠存儲如 HDFS。Checkpoint?發(fā)起的間隔默認的是和?batchDuration 一致;即每次 batch 發(fā)起、提交了需要運行的 job 后就做Checkpoint,另外在 job 完成了更新任務狀態(tài)的時候再次做一下?Checkpoint。

這樣一來,在 driver 失效并恢復后,可以讀取最近一次的 Checkpoint 來恢復作業(yè)的?DStreamGraph?和 job 的運行及完成狀態(tài)。

總結(jié)一下本節(jié)內(nèi)容為上述表格,可以看到,Spark Streaming 的長時容錯特性,能夠提供不重、不丟,exactly-once 的處理語義。

三、入口:StreamingContext

上面我們花了很多篇幅來介紹 Spark Streaming 的四大模塊,我們在最后介紹一下?StreamingContext。

下面我們用這段僅 11 行的完整?quick example,來說明用戶 code 是怎么通過?StreamingContext?與前面幾個模塊進行交互的:

import org.apache.spark._ import org.apache.spark.streaming._// 首先配置一下本 quick example 將跑在本機,app name 是 NetworkWordCount val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") // batchDuration 設置為 1 秒,然后創(chuàng)建一個 streaming 入口 val ssc = new StreamingContext(conf, Seconds(1))// ssc.socketTextStream() 將創(chuàng)建一個 SocketInputDStream;這個 InputDStream 的 SocketReceiver 將監(jiān)聽本機 9999 端口 val lines = ssc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" ")) // DStream transformation val pairs = words.map(word => (word, 1)) // DStream transformation val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation wordCounts.print() // DStream transformation // 上面 4 行利用 DStream transformation 構(gòu)造出了 lines -> words -> pairs -> wordCounts -> .print() 這樣一個 DStreamGraph // 但注意,到目前是定義好了產(chǎn)生數(shù)據(jù)的 SocketReceiver,以及一個 DStreamGraph,這些都是靜態(tài)的// 下面這行 start() 將在幕后啟動 JobScheduler, 進而啟動 JobGenerator 和 ReceiverTracker // ssc.start() // -> JobScheduler.start() // -> JobGenerator.start(); 開始不斷生成一個一個 batch // -> ReceiverTracker.start(); 開始往 executor 上分布 ReceiverSupervisor 了,也會進一步創(chuàng)建和啟動 Receiver ssc.start()// 然后用戶 code 主線程就 block 在下面這行代碼了 // block 的后果就是,后臺的 JobScheduler 線程周而復始的產(chǎn)生一個一個 batch 而不停息 // 也就是在這里,我們前面靜態(tài)定義的 DStreamGraph 的 print(),才一次一次被在 RDD 實例上調(diào)用,一次一次打印出當前 batch 的結(jié)果 ssc.awaitTermination()

所以我們看到,StreamingContext?是 Spark Streaming 提供給用戶 code 的、與前述 4 個模塊交互的一個簡單和統(tǒng)一的入口。

四、總結(jié)與回顧

在最后我們再把?Sark Streaming 官方 Programming Guide?的部分內(nèi)容放在這里,作為本文的一個回顧和總結(jié)。請大家看一看,如果看懂了本文的內(nèi)容,是不是讀下面這些比較 high-level 的介紹會清晰化很多 :-)

Spark Streaming?is an extension of the?core Spark API?that enables?scalable,?high-throughput,?fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

Internally, it works as follows.?Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

Spark Streaming provides a high-level abstraction called?discretized stream?or?DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams.?Internally, a DStream is represented as a sequence of RDDs.

...



(本文完,參與本文的討論請?猛戳這里,返回目錄請?猛戳這里)

轉(zhuǎn)載于:https://www.cnblogs.com/dailidong/p/7571134.html

總結(jié)

以上是生活随笔為你收集整理的Spark Streaming 实现思路与模块概述的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。