Flink状态管理与CheckPoint、Savepoint
轉(zhuǎn)載自:https://blog.csdn.net/hxcaifly/article/details/84673292
????https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81517928
????https://blog.csdn.net/qq_26654727/article/details/83833517
????https://blog.csdn.net/zero__007/article/details/88201498
Flink提供了Exactly once特性,是依賴于帶有barrier的分布式快照+可部分重發(fā)的數(shù)據(jù)源功能實(shí)現(xiàn)的。而分布式快照中,就保存了operator的狀態(tài)信息。
Flink的失敗恢復(fù)依賴于 檢查點(diǎn)機(jī)制 + 可部分重發(fā)的數(shù)據(jù)源。
檢查點(diǎn)機(jī)制機(jī)制:checkpoint定期觸發(fā),產(chǎn)生快照,快照中記錄了:
可部分重發(fā)的數(shù)據(jù)源:Flink選擇最近完成的檢查點(diǎn)K,然后系統(tǒng)重放整個分布式的數(shù)據(jù)流,然后給予每個operator他們在檢查點(diǎn)k快照中的狀態(tài)。數(shù)據(jù)源被設(shè)置為從位置Sk開始重新讀取流。例如在Apache Kafka中,那意味著告訴消費(fèi)者從偏移量Sk開始重新消費(fèi)。
Checkpoint是Flink實(shí)現(xiàn)容錯機(jī)制最核心的功能,它能夠根據(jù)配置周期性地基于Stream中各個Operator/task的狀態(tài)來生成快照,從而將這些狀態(tài)數(shù)據(jù)定期持久化存儲下來,當(dāng)Flink程序一旦意外崩潰時,重新運(yùn)行程序時可以有選擇地從這些快照進(jìn)行恢復(fù),從而修正因?yàn)楣收蠋淼某绦驍?shù)據(jù)異常。
快照的核心概念之一是barrier。 這些barrier被注入數(shù)據(jù)流并與記錄一起作為數(shù)據(jù)流的一部分向下流動。 barriers永遠(yuǎn)不會超過記錄,數(shù)據(jù)流嚴(yán)格有序,barrier將數(shù)據(jù)流中的記錄隔離成一系列的記錄集合,并將一些集合中的數(shù)據(jù)加入到當(dāng)前的快照中,而另一些數(shù)據(jù)加入到下一個快照中。
每個barrier都帶有快照的ID,并且barrier之前的記錄都進(jìn)入了該快照。 barriers不會中斷流處理,非常輕量級。 來自不同快照的多個barrier可以同時在流中出現(xiàn),這意味著多個快照可能并發(fā)地發(fā)生。
單流的barrier:
barrier在數(shù)據(jù)流源處被注入并行數(shù)據(jù)流中。快照n的barriers被插入的位置(記之為Sn)是快照所包含的數(shù)據(jù)在數(shù)據(jù)源中最大位置。例如,在Apache Kafka中,此位置將是分區(qū)中最后一條記錄的偏移量。 將該位置Sn報(bào)告給checkpoint協(xié)調(diào)器(Flink的JobManager)。
然后barriers向下游流動。當(dāng)一個中間操作算子從其所有輸入流中收到快照n的barriers時,它會為快照n發(fā)出barriers進(jìn)入其所有輸出流中。 一旦sink操作算子(流式DAG的末端)從其所有輸入流接收到barriers n,它就向checkpoint協(xié)調(diào)器確認(rèn)快照n完成。在所有sink確認(rèn)快照后,意味快照著已完成。
一旦完成快照n,job將永遠(yuǎn)不再向數(shù)據(jù)源請求Sn之前的記錄,因?yàn)榇藭r這些記錄(及其后續(xù)記錄)將已經(jīng)通過整個數(shù)據(jù)流拓?fù)?#xff0c;也即是已經(jīng)被處理結(jié)束。
多流的barrier:
接收多個輸入流的運(yùn)算符需要基于快照barriers上對齊(align)輸入流。 上圖說明了這一點(diǎn):
- 一旦操作算子從一個輸入流接收到快照barriers n,它就不能處理來自該流的任何記錄,直到它從其他輸入接收到barriers n為止。 否則,它會搞混屬于快照n的記錄和屬于快照n + 1的記錄。
- barriers n所屬的流暫時會被擱置。 從這些流接收的記錄不會被處理,而是放入輸入緩沖區(qū)。可以看到1,2,3會一直放在Input buffer,直到另一個輸入流的快照到達(dá)Operator。
- 一旦從最后一個流接收到barriers n,操作算子就會發(fā)出所有掛起的向后傳送的記錄,然后自己發(fā)出快照n的barriers。
之后,它恢復(fù)處理來自所有輸入流的記錄,在處理來自流的記錄之前優(yōu)先處理來自輸入緩沖區(qū)的記錄。
state一般指一個具體的task/operator的狀態(tài)。Flink中包含兩種基礎(chǔ)的狀態(tài):Keyed State和Operator State。
Keyed State,就是基于KeyedStream上的狀態(tài)。這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應(yīng)一個state。
Operator State與Keyed State不同,Operator State跟一個特定operator的一個并發(fā)實(shí)例綁定,整個operator只對應(yīng)一個state。相比較而言,在一個operator上,可能會有很多個key,從而對應(yīng)多個keyed state。
舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實(shí)例中,保存該實(shí)例中消費(fèi)topic的所有(partition, offset)映射。
Keyed State和Operator State,可以以兩種形式存在:原始狀態(tài)和托管狀態(tài)(Raw and Managed State)。托管狀態(tài)是由Flink框架管理的狀態(tài),如ValueState, ListState, MapState等。而raw state即原始狀態(tài),由用戶自行管理狀態(tài)具體的數(shù)據(jù)結(jié)構(gòu),框架在做checkpoint的時候,使用byte[]來讀寫狀態(tài)內(nèi)容。通常在DataStream上的狀態(tài)推薦使用托管的狀態(tài),當(dāng)實(shí)現(xiàn)一個用戶自定義operator時,會使用到原始狀態(tài)。
這里重點(diǎn)說說State-Keyed State,基于key/value的狀態(tài)接口,這些狀態(tài)只能用于keyedStream之上。keyedStream上的operator操作可以包含window或者map等算子操作。這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應(yīng)一個state。
key/value下可用的狀態(tài)接口:
ValueState: 狀態(tài)保存的是一個值,可以通過update()來更新,value()獲取。 ListState: 狀態(tài)保存的是一個列表,通過add()添加數(shù)據(jù),通過get()方法返回一個Iterable來遍歷狀態(tài)值。 ReducingState: 這種狀態(tài)通過用戶傳入的reduceFunction,每次調(diào)用add方法添加值的時候,會調(diào)用reduceFunction,最后合并到一個單一的狀態(tài)值。 MapState:即狀態(tài)值為一個map。用戶通過put或putAll方法添加元素。以上所述的State對象,僅僅用于與狀態(tài)進(jìn)行交互(更新、刪除、清空等),而真正的狀態(tài)值,有可能是存在內(nèi)存、磁盤、或者其他分布式存儲系統(tǒng)中。實(shí)際上,這些狀態(tài)有三種存儲方式: HeapStateBackend、MemoryStateBackend、FsStateBackend、RockDBStateBackend。
- MemoryStateBackend: state數(shù)據(jù)保存在java堆內(nèi)存中,執(zhí)行checkpoint的時候,會把state的快照數(shù)據(jù)保存到j(luò)obmanager的內(nèi)存中。
- FsStateBackend: state數(shù)據(jù)保存在taskmanager的內(nèi)存中,執(zhí)行checkpoint的時候,會把state的快照數(shù)據(jù)保存到配置的文件系統(tǒng)中,可以使用hdfs等分布式文件系統(tǒng)。
- RocksDBStateBackend: RocksDB跟上面的都略有不同,它會在本地文件系統(tǒng)中維護(hù)狀態(tài),state會直接寫入本地rocksdb中。同時RocksDB需要配置一個遠(yuǎn)端的filesystem。RocksDB克服了state受內(nèi)存限制的缺點(diǎn),同時又能夠持久化到遠(yuǎn)端文件系統(tǒng)中,比較適合在生產(chǎn)中使用。
通過創(chuàng)建一個StateDescriptor,可以得到一個包含特定名稱的狀態(tài)句柄,可以分別創(chuàng)建ValueStateDescriptor、 ListStateDescriptor或ReducingStateDescriptor狀態(tài)句柄。狀態(tài)是通過RuntimeContext來訪問的,因此只能在RichFunction中訪問狀態(tài)。這就要求UDF時要繼承Rich函數(shù),例如RichMapFunction、RichFlatMapFunction等。
Checkpoint的簡單設(shè)置
默認(rèn)情況下,checkpoint不會被保留,取消程序時即會刪除它們,但是可以通過配置保留定期檢查點(diǎn)。開啟Checkpoint功能,有兩種方式。其一是在conf/flink_conf.yaml中做系統(tǒng)設(shè)置;其二是針對任務(wù)再代碼里靈活配置。推薦第二種方式,針對當(dāng)前任務(wù)設(shè)置,設(shè)置代碼如下所示:
上面調(diào)用enableExternalizedCheckpoints設(shè)置為ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink處理程序被cancel后,會保留Checkpoint數(shù)據(jù),以便根據(jù)實(shí)際需要恢復(fù)到指定的Checkpoint處理。
ExternalizedCheckpointCleanup 可選項(xiàng)如下:
- ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業(yè)時保留檢查點(diǎn)。請注意,在這種情況下,您必須在取消后手動清理檢查點(diǎn)狀態(tài)。
- ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業(yè)時刪除檢查點(diǎn)。只有在作業(yè)失敗時,檢查點(diǎn)狀態(tài)才可用。
默認(rèn)情況下,如果設(shè)置了Checkpoint選項(xiàng),則Flink只保留最近成功生成的1個Checkpoint,而當(dāng)Flink程序失敗時,可以從最近的這個Checkpoint來進(jìn)行恢復(fù)。但是,如果希望保留多個Checkpoint,并能夠根據(jù)實(shí)際需要選擇其中一個進(jìn)行恢復(fù),這樣會更加靈活,比如,發(fā)現(xiàn)最近4個小時數(shù)據(jù)記錄處理有問題,希望將整個狀態(tài)還原到4小時之前。
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數(shù):
state.checkpoints.num-retained: 20Flink checkpoint目錄分別對應(yīng)的是 jobId,flink提供了在啟動之時通過設(shè)置 -s 參數(shù)指定checkpoint目錄, 讓新的jobId 讀取該checkpoint元文件信息和狀態(tài)信息,從而達(dá)到指定時間節(jié)點(diǎn)啟動job。
Savepoint
說到Checkpoint不得不介紹Savepoint。Savepoint是通過Flink的檢查點(diǎn)機(jī)制創(chuàng)建的流作業(yè)執(zhí)行狀態(tài)的一致圖像。可以使用Savepoints來停止和恢復(fù),分叉或更新Flink作業(yè)。保存點(diǎn)由兩部分組成:穩(wěn)定存儲(例如HDFS,S3,…)上的(通常是大的)二進(jìn)制文件和(相對較小的)元數(shù)據(jù)文件的目錄。穩(wěn)定存儲上的文件表示作業(yè)執(zhí)行狀態(tài)圖像的凈數(shù)據(jù)。Savepoint的元數(shù)據(jù)文件以(絕對路徑)的形式包含(主要)指向作為Savepoint一部分的穩(wěn)定存儲上的所有文件的指針。
從概念上講,Flink的Savepoints與Checkpoints的不同之處在于備份與傳統(tǒng)數(shù)據(jù)庫系統(tǒng)中的恢復(fù)日志不同。檢查點(diǎn)的主要目的是在意外的作業(yè)失敗時提供恢復(fù)機(jī)制。
Checkpoint的生命周期由Flink管理,即Flink創(chuàng)建,擁有和發(fā)布Checkpoint,無需用戶交互。作為一種恢復(fù)和定期觸發(fā)的方法,Checkpoint實(shí)現(xiàn)的兩個主要設(shè)計(jì)目標(biāo)是:i)being as lightweight to create (輕量級),ii)fast restore (快速恢復(fù))。針對這些目標(biāo)的優(yōu)化可以利用某些屬性,例如,JobCode在執(zhí)行嘗試之間不會改變。
與此相反,Savepoints由用戶創(chuàng)建,擁有和刪除。它們的用例是planned (計(jì)劃) 的,manual backup( 手動備份 ) 和 resume(恢復(fù))。例如,這可能是Flink版本的更新,更改Job graph ,更改 parallelism ,分配第二個作業(yè),如紅色/藍(lán)色部署,等等。當(dāng)然,Savepoints必須在終止工作后繼續(xù)存在。從概念上講,保存點(diǎn)的生成和恢復(fù)成本可能更高,并且更多地關(guān)注可移植性和對前面提到的作業(yè)更改的支持。
為了能夠在將來升級程序,主要的必要更改是通過uid(String)方法手動指定operator ID 。這些ID用于確定每個運(yùn)算符的狀態(tài)。
DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID如果未手動指定ID,則會自動生成這些ID。只要這些ID不變,就可以從保存點(diǎn)自動恢復(fù)。生成的ID取決于程序的結(jié)構(gòu),并且對程序更改很敏感。因此,強(qiáng)烈建議手動分配這些ID。
觸發(fā)保存點(diǎn)時,會創(chuàng)建一個新的保存點(diǎn)目錄,其中將存儲數(shù)據(jù)和元數(shù)據(jù)。可以通過配置默認(rèn)目標(biāo)目錄或使用觸發(fā)器命令指定自定義目標(biāo)目錄來控制此目錄的位置。
注意,checkpoint時的對齊步驟可能增加流式程序的等待時間。通常,這種額外的延遲大約為幾毫秒,但也會見到一些延遲顯著增加的情況。 對于要求所有記錄始終具有超低延遲(幾毫秒)的應(yīng)用程序,Flink可以在checkpoint期間跳過流對齊。一旦操作算子看到每個輸入流的checkpoint barriers,就會寫 checkpoint 快照。
當(dāng)跳過對齊時,即使在 checkpoint n的某些 checkpoint barriers 到達(dá)之后,操作算子仍繼續(xù)處理所有輸入。這樣,操作算子還可以在創(chuàng)建 checkpoint n 的狀態(tài)快照之前,繼續(xù)處理屬于checkpoint n + 1的數(shù)據(jù)。 在還原時,這些記錄將作為重復(fù)記錄出現(xiàn),因?yàn)樗鼈兌及?checkpoint n 的狀態(tài)快照中,并將作為 checkpoint n 之后數(shù)據(jù)的一部分進(jìn)行重復(fù)處理。
總結(jié)
以上是生活随笔為你收集整理的Flink状态管理与CheckPoint、Savepoint的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎么查询信用卡账单
- 下一篇: Flink流计算编程--在Windowe