2021年大数据Spark(四十四):Structured Streaming概述
?
Apache Spark在2016年的時候啟動了Structured Streaming項目,一個基于Spark SQL的全新流計算引擎Structured Streaming,讓用戶像編寫批處理程序一樣簡單地編寫高性能的流處理程序。
Structured Streaming并不是對Spark Streaming的簡單改進,而是吸取了在開發Spark SQL和Spark Streaming過程中的經驗教訓,以及Spark社區和Databricks眾多客戶的反饋,重新開發的全新流式引擎,致力于為批處理和流處理提供統一的高性能API。同時,在這個新的引擎中,也很容易實現之前在Spark Streaming中很難實現的一些功能,比如Event Time(事件時間)的支持,Stream-Stream Join(2.3.0 新增的功能),毫秒級延遲(2.3.0 即將加入的 Continuous Processing)。
?
Structured Streaming概述
Spark Streaming是Apache?Spark早期基于RDD開發的流式系統,用戶使用DStream API來編寫代碼,支持高吞吐和良好的容錯。其背后的主要模型是Micro Batch(微批處理),也就是將數據流切成等時間間隔(BatchInterval)的小批量任務來執行。
Structured Streaming則是在Spark 2.0加入的,經過重新設計的全新流式引擎。它的模型十分簡潔,易于理解。一個流的數據源從邏輯上來說就是一個不斷增長的動態表格,隨著時間的推移,新數據被持續不斷地添加到表格的末尾,用戶可以使用Dataset/DataFrame 或者 SQL 來對這個動態數據源進行實時查詢。
文檔:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html
?
Spark Streaming 不足
Spark Streaming 會接收實時數據源的數據,并切分成很多小的batches,然后被Spark Engine執行,產出同樣由很多小的batchs組成的結果流。
?
本質上,這是一種micro-batch(微批處理)的方式處理,用批的思想去處理流數據。這種設計讓Spark Streaming面對復雜的流式處理場景時捉襟見肘。
?
Spark Streaming 存在哪些不足,總結一下主要有下面幾點:
?1:使用 Processing Time 而不是 Event Time
Processing Time 是數據到達 Spark 被處理的時間,而 Event Time 是數據自帶的屬性,一般表示數據產生于數據源的時間。
比如 IoT 中,傳感器在 12:00:00 產生一條數據,然后在 12:00:05 數據傳送到 Spark,那么 Event Time 就是 12:00:00,而 Processing Time 就是 12:00:05。
Spark Streaming是基于DStream模型的micro-batch模式,簡單來說就是將一個微小時間段(比如說 1s)的流數據當前批數據來處理。如果要統計某個時間段的一些數據統計,毫無疑問應該使用 Event Time,但是因為 Spark Streaming 的數據切割是基于Processing Time,這樣就導致使用 Event Time 特別的困難。
?2:Complex, low-level api
DStream(Spark Streaming 的數據模型)提供的API類似RDD的API,非常的low level;
當編寫Spark Streaming程序的時候,本質上就是要去構造RDD的DAG執行圖,然后通過Spark Engine運行。這樣導致一個問題是,DAG 可能會因為開發者的水平參差不齊而導致執行效率上的天壤之別;
?3:reason about end-to-end application
end-to-end指的是直接input到out,如Kafka接入Spark Streaming然后再導出到HDFS中;
DStream 只能保證自己的一致性語義是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Straming 輸出到外部存儲的語義往往需要用戶自己來保證;
?4:批流代碼不統一
盡管批流本是兩套系統,但是這兩套系統統一起來確實很有必要,有時候確實需要將的流處理邏輯運行到批數據上面;
Streaming盡管是對RDD的封裝,但是要將DStream代碼完全轉換成RDD還是有一點工作量的,更何況現在Spark的批處理都用DataSet/DataFrameAPI;
?
總結
流式計算一直沒有一套標準化、能應對各種場景的模型,直到2015年Google發表了The Dataflow Model的論文( https://yq.aliyun.com/articles/73255?)
Google開源Apache Beam項目,基本上就是對Dataflow模型的實現,目前已經成為Apache的頂級項目,但是在國內使用不多。
國內使用的更多的是Apache Flink,因為阿里大力推廣Flink,甚至把花7億元把Flink母公司收購。
?
使用Yahoo的流基準平臺,要求系統讀取廣告點擊事件,并按照活動ID加入到一個廣告活動的靜態表中,并在10秒的event-time窗口中輸出活動計數。
比較了Kafka Streams 0.10.2、Apache Flink 1.2.1和Spark 2.3.0,在一個擁有5個c3.2*2大型Amazon EC2 工作節點和一個master節點的集群上(硬件條件為8個虛擬核心和15GB的內存)。
?
上圖(a)展示了每個系統最大穩定吞吐量(積壓前的吞吐量),Flink可以達到3300萬,而Structured Streaming可以達到6500萬,近乎兩倍于Flink。這個性能完全來自于Spark SQL的內置執行優化,包括將數據存儲在緊湊的二進制文件格式以及代碼生成。
附錄:【Streaming System系統】設計文章:
?Streaming System 第一章【Streaming 101】
網址:https://blog.csdn.net/xxscj/article/details/84990301
?Streaming System 第二章【The What- Where- When- and How of Data Processing】
網址:https://blog.csdn.net/xxscj/article/details/84989879
?
Structured Streaming 介紹
或許是對Dataflow模型的借鑒,也許是英雄所見略同,Spark在2.0版本中發布了新的流計算的API:Structured Streaming結構化流。Structured Streaming是一個基于Spark SQL引擎的可擴展、容錯的流處理引擎。
Structured Streaming統一了流、批的編程模型,可以使用靜態數據批處理一樣的方式來編寫流式計算操作,并且支持基于event_time的時間窗口的處理邏輯。隨著數據不斷地到達,Spark 引擎會以一種增量的方式來執行這些操作,并且持續更新結算結果。
?
模塊介紹
Structured Streaming 在 Spark 2.0 版本于 2016 年引入,設計思想參考很多其他系統的思想,比如區分 processing time 和 event time,使用 relational 執行引擎提高性能等。同時也考慮了和 Spark 其他組件更好的集成。
?
Structured Streaming 和其他系統的顯著區別主要如下:
?1:Incremental query model(增量查詢模型)
Structured Streaming 將會在新增的流式數據上不斷執行增量查詢,同時代碼的寫法和批處理 API(基于Dataframe和Dataset API)完全一樣,而且這些API非常的簡單。
?2:Support for end-to-end application(支持端到端應用)
Structured Streaming 和內置的 connector 使的 end-to-end 程序寫起來非常的簡單,而且 "correct by default"。數據源和sink滿足 "exactly-once" 語義,這樣我們就可以在此基礎上更好地和外部系統集成。
?3:復用 Spark SQL 執行引擎
Spark SQL 執行引擎做了非常多的優化工作,比如執行計劃優化、codegen、內存管理等。這也是Structured Streaming取得高性能和高吞吐的一個原因。
?
???????核心設計
2016年,Spark在2.0版本中推出了結構化流處理的模塊Structured Streaming,核心設計如下:
?1:Input and Output(輸入和輸出)
Structured Streaming 內置了很多 connector 來保證 input 數據源和 output sink 保證 exactly-once 語義。
實現 exactly-once 語義的前提:
Input 數據源必須是可以replay的,比如Kafka,這樣節點crash的時候就可以重新讀取input數據,常見的數據源包括 Amazon Kinesis, Apache Kafka 和文件系統。
Output sink 必須要支持寫入是冪等的,這個很好理解,如果 output 不支持冪等寫入,那么一致性語義就是 at-least-once 了。另外對于某些 sink, Structured Streaming 還提供了原子寫入來保證 exactly-once 語義。
補充:冪等性:在HTTP/1.1中對冪等性的定義:一次和多次請求某一個資源對于資源本身應該具有同樣的結果(網絡超時等問題除外)。也就是說,其任意多次執行對資源本身所產生的影響均與一次執行的影響相同。冪等性是系統服務對外一種承諾(而不是實現),承諾只要調用接口成功,外部多次調用對系統的影響是一致的。聲明為冪等的服務會認為外部調用失敗是常態,并且失敗之后必然會有重試。
?2:Program API(編程 API)
Structured Streaming 代碼編寫完全復用 Spark SQL 的 batch API,也就是對一個或者多個 stream 或者 table 進行 query。
?
?
?
query 的結果是 result table,可以以多種不同的模式(追加:append, 更新:update, 完全:complete)輸出到外部存儲中。
另外,Structured Streaming 還提供了一些 Streaming 處理特有的 API:Trigger, watermark, stateful operator。
?3:Execution Engine(執行引擎)
復用 Spark SQL 的執行引擎;
Structured Streaming 默認使用類似 Spark Streaming 的 micro-batch 模式,有很多好處,比如動態負載均衡、再擴展、錯誤恢復以及 straggler (straggler 指的是哪些執行明顯慢于其他 task 的 task)重試;
提供了基于傳統的 long-running operator 的 continuous(持續)?處理模式;
?4:Operational Features(操作特性)
利用 wal 和狀態State存儲,開發者可以做到集中形式的 rollback 和錯誤恢復FailOver。
?
???????編程模型
Structured Streaming將流式數據當成一個不斷增長的table,然后使用和批處理同一套API,都是基于DataSet/DataFrame的。如下圖所示,通過將流式數據理解成一張不斷增長的表,從而就可以像操作批的靜態數據一樣來操作流數據了。
?
在這個模型中,主要存在下面幾個組成部分:
1:Input Table(Unbounded Table),流式數據的抽象表示,沒有限制邊界的,表的數據源源不斷增加;
2:Query(查詢),對 Input Table 的增量式查詢,只要Input Table中有數據,立即(默認情況)執行查詢分析操作,然后進行輸出(類似SparkStreaming中微批處理);
3:Result Table,Query 產生的結果表;
4:Output,Result Table 的輸出,依據設置的輸出模式OutputMode輸出結果;
?
核心思想
Structured Streaming最核心的思想就是將實時到達的數據看作是一個不斷追加的unbound table無界表,到達流的每個數據項就像是表中的一個新行被附加到無邊界的表中,用靜態結構化數據的批處理查詢方式進行流計算。
?
以詞頻統計WordCount案例,Structured Streaming實時處理數據的示意圖如下,各行含義:
第一行、表示從TCP Socket不斷接收數據,使用【nc -lk 9999】;
第二行、表示時間軸,每隔1秒進行一次數據處理;
第三行、可以看成是“input unbound table",當有新數據到達時追加到表中;
第四行、最終的wordCounts是結果表,新數據到達后觸發查詢Query,輸出的結果;
第五行、當有新的數據到達時,Spark會執行“增量"查詢,并更新結果集;該示例設置為Complete Mode,因此每次都將所有數據輸出到控制臺;
?
上圖中數據實時處理說明:
第一、在第1秒時,此時到達的數據為"cat dog"和"dog dog",因此可以得到第1秒時的結果集cat=1 dog=3,并輸出到控制臺;
第二、當第2秒時,到達的數據為"owl cat",此時"unbound table"增加了一行數據"owl cat",執行word count查詢并更新結果集,可得第2秒時的結果集為cat=2 dog=3 owl=1,并輸出到控制臺;
第三、當第3秒時,到達的數據為"dog"和"owl",此時"unbound table"增加兩行數據"dog"和"owl",執行word count查詢并更新結果集,可得第3秒時的結果集為cat=2 dog=4 owl=2;
?
使用Structured Streaming處理實時數據時,會負責將新到達的數據與歷史數據進行整合,并完成正確的計算操作,同時更新Result Table。
總結
以上是生活随笔為你收集整理的2021年大数据Spark(四十四):Structured Streaming概述的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十三):S
- 下一篇: 2021年大数据Spark(二十九):S