flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现
2017年12月Apache Flink社區(qū)發(fā)布了1.4版本。該版本正式引入了一個里程碑式的功能:兩階段提交Sink,即TwoPhaseCommitSinkFunction。該SinkFunction提取并封裝了兩階段提交協(xié)議中的公共邏輯,自此Flink搭配特定source和sink(特別是0.11版本Kafka)搭建精確一次處理語義(?exactly-once semantics)應用成為了可能。作為一個抽象類TwoPhaseCommitSinkFunction提供了一個抽象層供用戶自行實現(xiàn)特定方法來支持?exactly-once semantics。
用戶可以閱讀Java文檔來學習如何使用TwoPhaseCommitSinkFunction,或者參考Flink官網文檔來了解FlinkKafkaProducer011是如何支持 exactly-once semantics的,因為后者正是基于TwoPhaseCommitSinkFunction實現(xiàn)的。
本文將深入討論一下Flink 1.4這個新特性以及其背后的設計思想。在本文中我們將:
1. 描述Flink應用中的checkpoint如何幫助確保exactly-once semantics
2. 展示Flink如何通過兩階段提交協(xié)議與source和sink交互以實現(xiàn)端到端的??exactly-once semantics交付保障
3. 給出一個使用TwoPhaseCommitSinkFunction實現(xiàn)?exactly-once semantics的文件Sink實例
1Flink應用的僅一次處理當談及僅一次處理時,我們真正想表達的是每條輸入消息只會影響最終結果一次!【譯者:影響應用狀態(tài)一次,而非被處理一次】即使出現(xiàn)機器故障或軟件崩潰,Flink也要保證不會有數(shù)據(jù)被重復處理或壓根就沒有被處理從而影響狀態(tài)。長久以來Flink一直宣稱支持 exactly-once semantics是指在一個Flink應用內部。在過去的幾年間,Flink開發(fā)出了checkpointing機制,而它則是提供這種應用內僅一次處理的基石。
在繼續(xù)之前我們簡要總結一下checkpointing算法,這對于我們了解本文內容至關重要。簡單來說,一個Flink checkpoint是一個一致性快照,它包含:
1. 應用的當前狀態(tài)
2. 消費的輸入流位置
Flink會定期地產生checkpoint并且把這些checkpoint寫入到一個持久化存儲上,比如S3或HDFS。這個寫入過程是異步的,這就意味著Flink即使在checkpointing過程中也是不斷處理輸入數(shù)據(jù)的。
如果出現(xiàn)機器或軟件故障,Flink應用重啟后會從最新成功完成的checkpoint中恢復——重置應用狀態(tài)并回滾狀態(tài)到checkpoint中輸入流的正確位置,之后再開始執(zhí)行數(shù)據(jù)處理,就好像該故障或崩潰從未發(fā)生過一般。
在Flink 1.4版本之前,僅一次處理只限于Flink應用內。Flink處理完數(shù)據(jù)后需要將結果發(fā)送到外部系統(tǒng),這個過程中Flink并不保證僅一次處理。但是Flink應用通常都需要接入很多下游子系統(tǒng),而開發(fā)人員很希望能在多個系統(tǒng)上維持僅一次處理語義,即維持端到端的僅一次處理語義。
為了提供端到端的僅一次處理語義,僅一次處理語義必須也要應用于Flink寫入數(shù)據(jù)的外部系統(tǒng)——故這些外部系統(tǒng)必須提供一種手段允許提交或回滾這些寫入操作,同時還要保證與Flink checkpoint能夠協(xié)調使用。
在分布式系統(tǒng)中協(xié)調提交和回滾的一個常見方法就是使用兩階段提交協(xié)議。下一章節(jié)中我們將討論下Flink的TwoPhaseCommitSinkFunction是如何利用兩階段提交協(xié)議來實現(xiàn)exactly-once semantics的。
2Flink實現(xiàn)僅一次語義的應用下面將給出一個實例來幫助了解兩階段提交協(xié)議以及Flink如何使用它來實現(xiàn)僅一次處理語義。該實例從Kafka中讀取數(shù)據(jù),經處理之后再寫回到Kafka。Kafka是非常受歡迎的消息隊列,而Kafka 0.11.0.0版本正式發(fā)布了對于事務的支持——這是與Kafka交互的Flink應用要實現(xiàn)端到端僅一次語義的必要條件。
當然,Flink支持這種僅一次處理語義并不只是限于與Kafka的結合,可以使用任何source/sink,只要它們提供了必要的協(xié)調機制。舉個例子,Pravega是Dell/EMC的一個開源流式存儲系統(tǒng),Flink搭配它也可以實現(xiàn)端到端的exactly-once semantics。
本例中的Flink應用包含以下組件,如上圖所示:
1. 一個source,從Kafka中讀取數(shù)據(jù)(即KafkaConsumer)
2. 一個時間窗口化的聚會操作
3. 一個sink,將結果寫回到Kafka(即KafkaProducer)
若要sink支持 exactly-once semantics,它必須以事務的方式寫數(shù)據(jù)到Kafka,這樣當提交事務時兩次checkpoint間的所有寫入操作當作為一個事務被提交。這確保了出現(xiàn)故障或崩潰時這些寫入操作能夠被回滾。
當然了,在一個分布式且含有多個并發(fā)執(zhí)行sink的應用中,僅僅執(zhí)行單次提交或回滾是不夠的,因為所有組件都必須對這些提交或回滾達成共識,這樣才能保證得到一個一致性的結果。Flink使用兩階段提交協(xié)議以及預提交(pre-commit)階段來解決這個問題。
Flink checkpointing開始時便進入到pre-commit階段。具體來說,一旦checkpoint開始,Flink的JobManager向輸入流中寫入一個checkpoint barrier將流中所有消息分割成屬于本次checkpoint的消息以及屬于下次checkpoint的。barrier也會在操作算子間流轉。對于每個operator來說,該barrier會觸發(fā)operator狀態(tài)后端為該operator狀態(tài)打快照。
眾所周知,flink kafka source保存Kafka消費offset,一旦完成位移保存,它會將checkpoint barrier傳給下一個operator。
這個方法對于opeartor只有內部狀態(tài)的場景是可行的。所謂的內部狀態(tài)就是完全由Flink狀態(tài)保存并管理的——本例中的第二個opeartor:時間窗口上保存的求和數(shù)據(jù)就是這樣的例子。當只有內部狀態(tài)時,pre-commit階段無需執(zhí)行額外的操作,僅僅是寫入一些已定義的狀態(tài)變量即可。當chckpoint成功時Flink負責提交這些寫入,否則就終止取消掉它們。
當時,一旦operator包含外部狀態(tài),事情就不一樣了。我們不能像處理內部狀態(tài)一樣處理這些外部狀態(tài)。因為外部狀態(tài)通常都涉及到與外部系統(tǒng)的交互。如果是這樣的話,外部系統(tǒng)必須要支持可與兩階段提交協(xié)議捆綁使用的事務才能確保實現(xiàn)整體的exactly-once semantics。
顯然本例中的data sink是有外部狀態(tài)的,因為它需要寫入數(shù)據(jù)到Kafka。此時的pre-commit階段下data sink在保存狀態(tài)到狀態(tài)存儲的同時還必須預提交它的外部事務,如下圖所示:
??
當checkpoint barrier在所有operator都傳遞了一遍且對應的快照也都成功完成之后,pre-commit階段才算完成。該過程中所有創(chuàng)建的快照都被視為是checkpoint的一部分。其實,checkpoint就是整個應用的全局狀態(tài),當然也包含pre-commit階段提交的外部狀態(tài)。當出現(xiàn)崩潰時,我們可以回滾狀態(tài)到最新已成功完成快照時的時間點。
下一步就是通知所有的operator,告訴它們checkpoint已成功完成。這便是兩階段提交協(xié)議的第二個階段:commit階段。該階段中JobManager會為應用中每個operator發(fā)起checkpoint已完成的回調邏輯。
本例中的data source和窗口操作無外部狀態(tài),因此在該階段,這兩個opeartor無需執(zhí)行任何邏輯,但是data sink是有外部狀態(tài)的,因此此時我們必須提交外部事務,如下圖所示:
匯總以上所有信息,總結一下:
1. 一旦所有operator完成各自的pre-commit,它們會發(fā)起一個commit操作
2. 倘若有一個pre-commit失敗,所有其他的pre-commit必須被終止,并且Flink會回滾到最近成功完成decheckpoint
3. 一旦pre-commit完成,必須要確保commit也要成功——operator和外部系統(tǒng)都需要對此進行保證。倘若commit失敗(比如網絡故障等),Flink應用就會崩潰,然后根據(jù)用戶重啟策略執(zhí)行重啟邏輯,之后再次重試commit。這個過程至關重要,因為倘若commit無法順利執(zhí)行,就可能出現(xiàn)數(shù)據(jù)丟失的情況
因此,所有opeartor必須對checkpoint最終結果達成共識:即所有operator都必須認定數(shù)據(jù)提交要么成功執(zhí)行,要么被終止然后回滾。
3Flink中實現(xiàn)兩階段提交這種operator的管理有些復雜,這也是為什么Flink提取了公共邏輯并封裝進TwoPhaseCommitSinkFunction抽象類的原因。
下面討論一下如何擴展TwoPhaseCommitSinkFunction類來實現(xiàn)一個簡單的基于文件的sink。若要實現(xiàn)支持exactly-once semantics的文件sink,我們需要實現(xiàn)以下4個方法:
1. beginTransaction:開啟一個事務,在臨時目錄下創(chuàng)建一個臨時文件,之后,寫入數(shù)據(jù)到該文件中
2. preCommit:在pre-commit階段,flush緩存數(shù)據(jù)塊到磁盤,然后關閉該文件,確保再不寫入新數(shù)據(jù)到該文件。同時開啟一個新事務執(zhí)行屬于下一個checkpoint的寫入操作3. commit:在commit階段,我們以原子性的方式將上一階段的文件寫入真正的文件目錄下。注意:這會增加輸出數(shù)據(jù)可見性的延時。通俗說就是用戶想要看到最終數(shù)據(jù)需要等會,不是實時的。4. abort:一旦終止事務,我們離自己刪除臨時文件
當出現(xiàn)崩潰時,Flink會恢復最新已完成快照中應用狀態(tài)。需要注意的是在某些極偶然的場景下,pre-commit階段已成功完成而commit尚未開始(也就是operator尚未來得及被告知要開啟commit),此時倘若發(fā)生崩潰Flink會將opeartor狀態(tài)恢復到已完成pre-commit但尚未commit的狀態(tài)。
在一個checkpoint狀態(tài)中,對于已完成pre-commit的事務狀態(tài),我們必須保存足夠多的信息,這樣才能確保在重啟后要么重新發(fā)起commit亦或是終止掉事務。本例中這部分信息就是臨時文件所在的路徑以及目標目錄。
TwoPhaseCommitSinkFunction考慮了這種場景,因此當應用從checkpoint恢復之后TwoPhaseCommitSinkFunction總是會發(fā)起一個搶占式的commit。這種commit必須是冪等性的,雖然大部分情況下這都不是問題。本例中對應的這種場景就是:臨時文件不在臨時目錄下,而是已經被移動到目標目錄下。
4總結本文的一些關鍵要點:
Flinkcheckpointing機制是實現(xiàn)兩階段提交協(xié)議以及提供僅一次語義的基石
與其他系統(tǒng)持久化傳輸中的數(shù)據(jù)不同,Flink不需要將計算的每個階段寫入到磁盤中
Flink新的TwoPhaseCommitSinkFunction封裝兩階段提交協(xié)議的公共邏輯使之搭配支持事務的外部系統(tǒng)來共同構建僅一次語義應用成為可能
自1.4版本起,Flink + Pravega和Kafka 0.11 producer開始支持僅一次語義
Flink Kafka 0.11 producer基于TwoPhaseCommitSinkFunction實現(xiàn),比起至少一次語義的producer而言開銷并未顯著增加
推薦閱讀:
Flink 1.10 細粒度資源管理解析
Flink State 最佳實踐
不可不知的spark shuffle
總結
以上是生活随笔為你收集整理的flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql2017windows安装_m
- 下一篇: c语言标准整形,C语言整形数值范围问题