Flink 1.13,面向流批一体的运行时与 DataStream API 优化
本文由社區志愿者苗文婷整理,內容來源自阿里巴巴技術專家高赟(云騫) 在 5 月 22 日北京站 Flink Meetup 分享的《面向流批一體的 Flink 運行時與 DataStream API 優化》。文章主要分為 4 個部分:
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
1. 流批一體的 Flink
1.1 架構介紹
首先看下 Flink 流批一體的整體邏輯。Flink 在早期的時候,雖然是一個可以同時支持流處理和批處理的框架,但是它的流處理和批處理的實現,不管是在 API 層,還是在底下的 Shuffle、調度、算子層,都是單獨的兩套。這兩套實現是完全獨立的,沒有特別緊密的關聯。
在流批一體這一目標的引導下,Flink 現在已經對底層的算子、調度、Shuffle 進行了統一的抽象,以統一的方式向上支持 DataStream API 和 Table API 兩套接口。DataStream API 是一種比較偏物理層的接口,Table API 是一種 Declearetive 的接口,這兩套接口對流和批來說都是統一的。
1.2 優點
- 代碼復用
基于 DataStream API 和 Table API,用戶可以寫同一套代碼來同時處理歷史的數據和實時的數據,例如數據回流的場景。
- 易于開發
統一的 Connector 和算子實現,減少開發和維護的成本。
- 易于學習
減少學習成本,避免學習兩套相似接口。
- 易于維護
使用同一系統支持流作業和批作業,減少維護成本。
1.3 數據處理過程
下面簡單介紹 Flink 是怎么抽象流批一體的,Flink 把作業拆成了兩種:
- 第一種類型的作業是處理無限數據的無限流的作業
這種作業就是我們平時所認知的流作業,對于這種作業,Flink 采用一個標準流的執行模式,需要考慮記錄的時間,通過 Watermark 對齊的方式推進整個系統的時間以達到一些數據聚合和輸出的目的,中間通過 State 來維護中間狀態。
- 第二種類型的作業是處理有限數據集的作業
數據可能是保存在文件中,或者是以其他方式提前保留下來的一個有限數據集。此時可以把有限數據集看作是無限數據集的一個特例,所以它可以自然的跑在之前的流處理模式之上,無需經過代碼修改,可以直接支持。
但這里可能會忽略掉有限數據集數據有限的特點,在接口上還需要處理更細粒度的時間、Watermark 等語義,可能會引入額外的復雜性。另外,在性能方面,因為是按流的方式處理,在一開始就需要把所有的任務拉起來,可能需要占用更多的資源,如果采用的是 RocksDB backend,相當于是一個大的 Hash 表,在 key 比較多的情況下,可能會有隨機 IO 訪問的問題。
但是在批執行模式下,可以通過排序的方式,用一種 IO 更加友好的方式來實現整個數據處理的流程。所以說,批處理模式在考慮數據有限的前提下,在調度、Shuffle、算子的實現上都給我們提供了更大的選擇空間。
最后,針對有限數據流,不管是采用哪種處理模式,我們希望最終的處理結果都是一致的。
1.4 近期演進
Flink 在最近的幾個版本中,在 API 和實現層都朝著流批一體的目標做了很多的努力。
- 在 Flink 1.11 及之前:
Flink 統一了 Table/SQL API,并引入了統一的 blink planner,blink planner 對流和批都會翻譯到 DataStream 算子之上。此外,對流和批還引入了統一的 shuffle 架構。
- 在 Flink 1.12 中:
針對批的 shuffle 引入了一種新的基于 Sort-Merge 的 shuffle 模式,相對于之前 Flink 內置的 Hash shuffle,性能會有很大提升。在調度方面,Flink 引入了一種基于 Pipeline Region 的流批一體的調度器。
- 在 Flink 1.13 中:
完善了 Sort-Merge Shuffle,并對 Pipeline Region scheduler 在大規模作業下進行了性能優化。另外,前面提到過,對于有限流的兩種執行模式,我們預期它的執行結果應該是一致的。但是現在 Flink 在作業執行結束的時候還有一些問題,導致它并不能完全達到一致。
所以在 1.13 中,還有一部分的工作是針對有限數據集作業,怎么在流批,尤其是在流的模式下,使它的結果和預期的結果保持一致。
- 未來的 Flink 1.14:
需要繼續完成有限作業一致性保證、批流切換 Source、逐步廢棄 DataSet API 等工作。
2. 運行時優化
2.1 大規模作業調度優化
2.1.1 邊的時間復雜度問題
Flink 提交作業時會生成一個作業的 DAG 圖,由多個頂點組成,頂點對應著我們實際的處理節點,如 Map。每個處理節點都會有并發度,此前 Flink 的實現里,當我們把作業提交到 JM 之后,JM 會對作業展開,生成一個 Execution Graph。
如下圖,作業有兩個節點,并發度分別為 2 和 3。在 JM 中實際維護的數據結構里,會分別維護 2 個 task 和 3 個 task,并由 6 條執行邊組成,Flink 基于此數據結構來維護整個作業的拓撲信息。在這個拓撲信息的基礎上,Flink 可以單獨維護每個 task 的狀態,當任務掛了之后以識別需要拉起的 task。
如果以這種 all-to-all 的通信,也就是每兩個上下游 task 之間都有邊的情況下,上游并發 下游并發,將出現 O(N^2) 的數據結構。這種情況下,內存的占用是非常驚人的,如果是 10k 10k 的邊,JM 的內存占用將達到 4.18G。此外,作業很多的計算復雜度都是和邊的數量相關的,此時的空間復雜度為 O(N^2) 或 O(N^3),如果是 10k * 10k 的邊,作業初次調度時間將達到 62s。
可以看出,除了初始調度之外,對于批作業來說,有可能是上游執行完之后繼續執行下游,中間的調度復雜度都是 O(N^2) 或 O(N^3),這樣就會導致很大的性能開銷。另外,內存占用很大的話,GC 的性能也不會特別好。
2.1.2 Execution Graph 的對稱性
針對 Flink 在大規模作業下內存和性能方面存在的一些問題,經過一些深入分析,可以看出上述例子中上下游節點之間是有一定對稱性的。
?
Flink 中 “邊” 的類型可以分為兩種:
- 一種是 Pointwise 型,上游和下游是一一對應的,或者上游一個對應下游幾個,不是全部相連的,這種情況下,邊的數量基本是線性的 O(N), 和算子數在同一個量級。
- 另一種是 All-to-all 型,上游每一個 task 都要和下游的每一個 task 相連,在這種情況下可以看出,每一個上游的 task 產生的數據集都要被下游所有的 task 消費,實際上是一個對稱的關系。只要記住上游的數據集會被下游的所有 task 來消費,就不用再單獨存中間的邊了。
所以,Flink 在 1.13 中對上游的數據集和下游的節點分別引入了 ResultPartitionGroup 和 VertexGroup 的概念。尤其是對于 All-to-all 的邊,因為上下游之間是對稱的,可以把所有上游產生的數據集放到一個 Group 里,把下游所有的節點也放到一個 Group 里,在實際維護時不需要存中間的邊的關系,只需要知道上游的哪個數據集是被下游的哪個 Group 消費,或下游的哪個頂點是消費上游哪個 Group 的數據集。
通過這種方式,減少了內存的占用。
另外,在實際做一些調度相關計算的時候,比如在批處理里,假如所有的邊都是 blocking 邊的情況下,每個節點都屬于一個單獨的 region。之前計算 region 之間的上下游關系,對上游的每個頂點,都需要遍歷其下游的所有頂點,所以是一個 O(N^2) 的操作。
而引入 ConsumerGroup 之后,就會變成一個 O(N) 的線性操作。
2.1.3 優化結果
經過以上數據結構的優化,在 10k * 10k 邊的情況下,可以將 JM 內存占用從 4.18G 縮小到 12.08M, 初次調度時間長從 62s 縮減到 12s。這個優化其實是非常顯著的,對用戶來說,只要升級到 Flink 1.13 就可以獲得收益,不需要做任何額外的配置。
2.2 Sort-Merge Shuffle
另外一個優化,是針對批的作業在數據 shuffle 方面做的優化。一般情況下,批的作業是在上游跑完之后,會先把結果寫到一個中間文件里,然后下游再從中間文件里拉取數據進行處理。
這種方式的好處就是可以節省資源,不需要上游和下游同時起來,在失敗的情況下,也不需要從頭執行。這是批處理的常用執行方式。
?
2.2.1 Hash Shuffle
那么,shuffle 過程中,中間結果是如何保存到中間文件,下游再拉取的?
之前 Flink 引入的是 Hash shuffle,再以 All-to-all 的邊舉例,上游 task 產生的數據集,會給下游的每個 task 寫一個單獨的文件,這樣系統可能會產生大量的小文件。并且不管是使用文件 IO 還是 mmap 的方式,寫每個文件都至少使用一塊緩沖區,會造成內存浪費。下游 task 隨機讀取的上游數據文件,也會產生大量隨機 IO。
所以,之前 Flink 的 Hash shuffle 應用在批處理中,只能在規模比較小或者在用 SSD 的時候,在生產上才能比較 work。在規模比較大或者 SATA 盤上是有較大的問題的。
2.2.2 Sort Shuffle
所以,在 Flink 1.12 和 Flink 1.13 中,經過兩個版本,引入了一種新的基于 Sort Merge 的 shuffle。這個 Sort 并不是指對數據進行 Sort,而是對下游所寫的 task 目標進行 Sort。
大致的原理是,上游在輸出數據時,會使用一個固定大小的緩沖區,避免緩沖區的大小隨著規模的增大而增大,所有的數據都寫到緩沖區里,當緩沖區滿時,會做一次排序并寫到一個單獨文件里,后面的數據還是基于此緩存區繼續寫,續寫的一段會拼到原來的文件后面。最后單個的上游任務會產生一個中間文件,由很多段組成,每個段都是有序的結構。
和其他的批處理的框架不太一樣,這邊并不是基于普通的外排序。一般的外排序是指會把這些段再做一次單獨的 merge,形成一個整體有序的文件,這樣下游來讀的時候會有更好的 IO 連續性,防止每一段每一個 task 要讀取的數據段都很小。但是,這種 merge 本身也是要消耗大量的 IO 資源的,有可能 merge 的時間帶來的開銷會遠超過下游順序讀帶來的收益。
所以,這里采用了另外一種方式:在下游來請求數據的時候,比如下圖中的 3 個下游都要來讀上游的中間文件,會有一個調度器對下游請求要讀取的文件位置做一個排序,通過在上層增加 IO 調度的方式,來實現整個文件 IO 讀取的連續性,防止在 SATA 盤上產生大量的隨機 IO。
在 SATA 盤上,相對于 Hash shuffle,Sort shuffle 的 IO 性能可以提高 2~8 倍。通過 Sort shuffle,使得 Flink 批處理基本達到了生產可用的狀態,在 SATA 盤上 IO 性能可以把磁盤打到 100 多M,而 SATA 盤最高也就能達到 200M 的讀寫速度。
為了保持兼容性,Sort shuffle 并不是默認啟用的,用戶可以控制下游并發達到多少來啟用 Sort Merge Shuffle。并且可以通過啟用壓縮來進一步提高批處理的性能。Sort Merge shuffle 并沒有額外占用內存,現在占用的上游讀寫的緩存區,是從 framework.off-heap 中抽出的一塊。
3. DataStream API 優化
3.1 2PC & 端到端一致性
為了保證端到端的一致性,對于 Flink 流作業來說,是通過兩階段提交的機制來實現的,結合了 Flink 的 checkpoint、failover 機制和外部系統的一些特性。
大概的邏輯是,當我想做端到端的一致性,比如讀取 Kafka 再寫到 Kafka,在正常處理時會把數據先寫到一個 Kafka 的事務里,當做 checkpoint 時進行 preCommit,這樣數據就不會再丟了。
如果 checkpoint 成功的話,會進行一次正式的 commit。這樣就保證了外部系統的事務和 Flink 內部的 failover 是一致的,比如 Flink 發生了 failover 需要回滾到上一個 checkpoint , 外部系統中跟這一部分對應的事務也會被 abort 掉,如果 checkpoint 成功了,外部事務的 commit 也會成功。
Flink 端到端的一致性依賴于 checkpoint 機制。但是,在遇到有限流時,就會有一些問題:
- 具有有限流的作業,task 結束之后,Flink 是不支持做 checkpoint 的,比如流批混合的作業,其中有一部分會結束,之后 Flink 就沒辦法再做 checkpoint,數據也就不會再提交了。
- 在有限流數據結束時,因為 checkpoint 是定時執行的,不能保證最后一個 checkpoint 一定能在處理完所有數據后執行,可能導致最后一部分數據無法提交。
以上就會導致在流模式下,有限流作業流/批執行模式結果不一致。
3.2 支持部分 Task 結束后的 Checkpoint (進行中)
從 Flink 1.13 開始,支持在一部分 task 結束之后,也能做 checkpoint。checkpoint 實際上是維護了每個算子的所有 task 的狀態列表。
在有一部分 task 結束之后,如下圖的虛線部分。Flink 會把結束的 task 分為兩種:
- 如果一個算子的所有 subtask 都已經結束了,就會為這個算子存一個 finished 標記。
- 如果一個算子只有部分 task 結束,就只存儲未結束的 task 狀態。
基于這個 checkpoint ,當 failover 之后還是會拉起所有算子,如果識別到算子的上一次執行已經結束,即 finsihed = true,就會跳過這個算子的執行。尤其是針對 Source 算子來說,如果已經結束,后面就不會再重新執行發送數據了。通過上述方式就可以保證整個狀態的一致性,即使有一部分 task 結束,還是照樣走 checkpoint。
Flink 也重新整理了結束語義。現在 Flink 作業結束有幾種可能:
- 作業結束:數據是有限的,有限流作業正常結束;
- stop-with-savepoint ,采一個 savepoint 結束;
- stop-with-savepoint --drain ,采一個 savepoint 結束,并會將 watermark 推進到正無窮大。
之前這邊是兩種不同的實現邏輯,并且都有最后一部分數據無法提交的問題。
- 對作業結束和 stop-with-savepoint --drain 兩種語義,預期作業是不會再重啟的,都會對算子調 endOfInput() , 通知算子通過一套統一的方式做 checkpoint 。
- 對 stop-with-savepoint 語義,預期作業是會繼續 savepoint 重啟的,此時就不會對算子調 endOfInput()。后續會再做一個 checkpoint , 這樣對于一定會結束并不再重啟的作業,可以保證最后一部分數據一定可以被提交到外部系統中。
4. 總結
在 Flink 的整個目標里,其中有一點是期望做一個對有限數據集和無限數據集高效處理的統一平臺。目前基本上已經有了一個初步的雛形,不管是在 API 方面,還是在 runtime 方面。下面來舉個例子說明流批一體的好處。
針對用戶的回流作業,平時是處理無限流的作業,如果某一天想改個邏輯,用 stop-with-savepoint 方式把流停掉,但是這個變更邏輯還需要追回到前兩個月之內的數據來保證結果的一致性。此時,就可以啟一個批的作業:作業不加修改,跑到提前緩存下來的輸入數據上,用批的模式可以盡快地訂正前兩個月的數據。另外,基于新的邏輯,使用前面保存的 savepoint,可以重啟一個新的流作業。
可以看出,在上述整個流程中,如果是之前流批分開的情況,是需要單獨開發作業進行數據訂正的。但在流批一體的情況下,可以基于流的作業自然的進行數據訂正,不需要用戶再做額外的開發。
在 Flink 后續的版本中,還會進一步考慮更多流批結合的場景,比如用戶先做一個批的處理,對狀態進行初始化之后,再切到無限流上的場景。當然,在流和批單獨的功能上,也會做進一步的優化和完善,使得 Flink 在流批方面都是具有競爭力的計算框架。
原文鏈接:https://developer.aliyun.com/article/784968?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的Flink 1.13,面向流批一体的运行时与 DataStream API 优化的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【直播回看】「EDGE X Kubern
- 下一篇: 重磅发布!阿里云云效《阿里巴巴DevOp