Spark学习之Spark Streaming
一、簡介
許多應用需要即時處理收到的數據,例如用來實時追蹤頁面訪問統計的應用、訓練機器學習模型的應用,還有自動檢測異常的應用。Spark Streaming 是 Spark 為這些應用而設計的模型。它允許用戶使用一套和批處理非常接近的 API 來編寫流式計算應用,這樣就可以大量重用批處理應用的技術甚至代碼。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用離散化流(discretized stream)作為抽象表示,叫作 DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每個時間區間收到的數據都作為 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(因此得名“離散化”)。DStream 可以從各種輸入源創建,比如 Flume、Kafka 或者 HDFS。創建出來的 DStream 支持兩種操作,一種是轉化操作(transformation),會生成一個新的DStream,另一種是輸出操作(output operation),可以把數據寫入外部系統中。DStream提供了許多與 RDD 所支持的操作相類似的操作支持,還增加了與時間相關的新操作,比如滑動窗口。
和批處理程序不同,Spark Streaming 應用需要進行額外配置來保證 24/7 不間斷工作。Spark Streaming 的檢查點(checkpointing)機制,也就是把數據存儲到可靠文件系統(比如 HDFS)上的機制,這也是 Spark Streaming 用來實現不間斷工作的主要方式。
二、一個簡單的例子
我們會從一臺服務器的 9999 端口上實時輸入數據,并在控制臺打印出來。
首先,你得有一個nc軟件,因為我是在window下運行程序的,但是在Linux系統里面就不需要,Linux里面有內置的nc命令。
nc軟件的用法:
開一個命令行窗口(這里要切換到nc軟件的路徑下): 服務端:nc –lp 9999 //客戶端:nc localhost 9999nc軟件啟動成功的界面:
然后就是一個簡單的Spark Streaming的代碼:
import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Secondsobject Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test").setMaster("local[4]") // 從SparkConf創建StreamingContex并指定4秒鐘的批處理大小// 用來指定多長時間處理一次新數據的批次間隔(batch interval)作為輸入val ssc = new StreamingContext(conf,Seconds(4))// 連接到本地機器9999端口val lines = ssc.socketTextStream("localhost", 9999)lines.print()// 啟動流式計算環境StreamingContext并等待它"完成"ssc.start()// 等待作業完成ssc.awaitTermination()} }連接成功的界面:
然后我在剛才的界面輸入"Hello world",然后就會在控制臺界面打印出來。
三、架構與抽象
Spark Streaming 使用“微批次”的架構,把流式計算當作一系列連續的小規模批處理來對待。Spark Streaming 從各種輸入源中讀取數據,并把數據分組為小的批次。新的批次按均勻的時間間隔創建出來。在每個時間區間開始的時候,一個新的批次就創建出來,在該區間內收到的數據都會被添加到這個批次中。在時間區間結束時,批次停止增長。時間區間的大小是由批次間隔這個參數決定的。批次間隔一般設在 500 毫秒到幾秒之間,由應用開發者配置。每個輸入批次都形成一個 RDD,以 Spark 作業的方式處理并生成其他的 RDD。處理的結果可以以批處理的方式傳給外部系統。
?
?
四、檢查點機制
Spark Streaming 對 DStream 提供的容錯性與 Spark 為 RDD 所提供的容錯性一致:只要輸入數據還在,它就可以使用 RDD 譜系重算出任意狀態(比如重新執行處理輸入數據的操作)。默認情況下,收到的數據分別存在于兩個節點上,這樣 Spark 可以容忍一個工作節點的故障。不過,如果只用譜系圖來恢復的話,重算有可能會花很長時間,因為需要處理從程序啟動以來的所有數據。因此,Spark Streaming 也提供了檢查點機制,可以把狀態階段性地存儲到可靠文件系統中(例如 HDFS 或者 S3)。一般來說,你需要每處理 5-10 個批次的數據就保存一次。在恢復數據時,Spark Streaming 只需要回溯到上一個檢查點即可。
如果流計算應用中的驅動器程序崩潰了,還可以重啟驅動器程序并讓驅動器程序從檢查點恢復,這樣 Spark Streaming 就可以讀取之前運行的程序處理數據的進度,并從那里繼續。
ssc.checkpoint("hdfs://...")五、轉化操作
DStream 的轉化操作可以分為無狀態(stateless)和有狀態(stateful)兩種。
? 在無狀態轉化操作中,每個批次的處理不依賴于之前批次的數據。常見的RDD轉化操作,例如 map() 、 filter() 、 reduceByKey() 等,都是無狀態轉化操作,無狀態轉化操作是分別應用到每個 RDD 上的。
? 相對地,有狀態轉化操作需要使用之前批次的數據或者是中間結果來計算當前批次的數據。有狀態轉化操作包括基于滑動窗口的轉化操作和追蹤狀態變化的轉化操作。
DStream 的有狀態轉化操作是跨時間區間跟蹤數據的操作;也就是說,一些先前批次的數據也被用來在新的批次中計算結果。主要的兩種類型是滑動窗口和 updateStateByKey() ,前者以一個時間階段為滑動窗口進行操作,后者則用來跟蹤每個鍵的狀態變化(例如構建一個代表用戶會話的對象)。有狀態轉化操作需要在你的 StreamingContext 中打開檢查點機制來確保容錯性。
所有基于窗口的操作都需要兩個參數,分別為窗口時長以及滑動步長,兩者都必須是StreamContext 的批次間隔的整數倍。窗口時長控制每次計算最近的多少個批次的數據,其實就是最近的 windowDuration/batchInterval 個批次。如果有一個以 10 秒為批次間隔的源DStream,要創建一個最近 30 秒的時間窗口(即最近 3 個批次),就應當把 windowDuration設為 30 秒。而滑動步長的默認值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。如果源 DStream 批次間隔為 10 秒,并且我們只希望每兩個批次計算一次窗口結果,就應該把滑動步長設置為 20 秒。
對 DStream 可以用的最簡單窗口操作是 window() ,它返回一個新的 DStream 來表示所請求的窗口操作的結果數據。換句話說, window() 生成的 DStream 中的每個 RDD 會包含多個批次中的數據,可以對這些數據進行 count() 、 transform() 等操作。
lines.window(windowDuration, slideDuration) lines.reduceByWindow(reduceFunc, windowDuration, slideDuration)
有時,我們需要在 DStream 中跨批次維護狀態(例如跟蹤用戶訪問網站的會話)。針對這種情況, updateStateByKey() 為我們提供了對一個狀態變量的訪問,用于鍵值對形式的DStream。給定一個由(鍵,事件)對構成的 DStream,并傳遞一個指定如何根據新的事件更新每個鍵對應狀態的函數,它可以構建出一個新的 DStream,其內部數據為(鍵,狀態)對。例如,在網絡服務器日志中,事件可能是對網站的訪問,此時鍵是用戶的 ID。使用updateStateByKey() 可以跟蹤每個用戶最近訪問的 10 個頁面。這個列表就是“狀態”對象,我們會在每個事件到來時更新這個狀態。
六、輸出輸入操作
輸出操作指定了對流數據經轉化操作得到的數據所要執行的操作(例如把結果推入外部數據庫或輸出到屏幕上)。與 RDD 中的惰性求值類似,如果一個 DStream 及其派生出的 DStream都沒有被執行輸出操作,那么這些 DStream 就都不會被求值。如果StreamingContext 中沒有設定輸出操作,整個 context 就都不會啟動。
在 Scala 中將 DStream 保存為文本文件
ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")因為 Spark 支持從任意 Hadoop 兼容的文件系統中讀取數據,所以 Spark Streaming 也就支持從任意 Hadoop 兼容的文件系統目錄中的文件創建數據流。
val line = ssc.textFileStream("directory")這篇博文主要來自《Spark快速大數據分析》這本書里面的第十章,內容有刪減,還有本書的一些代碼的實驗結果。
?
轉載于:https://www.cnblogs.com/xiaoyh/p/10791245.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Spark学习之Spark Streaming的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cnpm install -g liv
- 下一篇: java实现多线程的4种方式