日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink状态管理与CheckPoint、Savepoint

發(fā)布時間:2024/9/27 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink状态管理与CheckPoint、Savepoint 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

轉(zhuǎn)載自:https://blog.csdn.net/hxcaifly/article/details/84673292
????https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81517928
????https://blog.csdn.net/qq_26654727/article/details/83833517
????https://blog.csdn.net/zero__007/article/details/88201498

Flink提供了Exactly once特性,是依賴于帶有barrier的分布式快照+可部分重發(fā)的數(shù)據(jù)源功能實(shí)現(xiàn)的。而分布式快照中,就保存了operator的狀態(tài)信息。

Flink的失敗恢復(fù)依賴于 檢查點(diǎn)機(jī)制 + 可部分重發(fā)的數(shù)據(jù)源
檢查點(diǎn)機(jī)制機(jī)制:checkpoint定期觸發(fā),產(chǎn)生快照,快照中記錄了:

1.當(dāng)前檢查點(diǎn)開始時數(shù)據(jù)源(例如Kafka)中消息的offset。 2.記錄了所有有狀態(tài)的operator當(dāng)前的狀態(tài)信息(例如sum中的數(shù)值)。

可部分重發(fā)的數(shù)據(jù)源:Flink選擇最近完成的檢查點(diǎn)K,然后系統(tǒng)重放整個分布式的數(shù)據(jù)流,然后給予每個operator他們在檢查點(diǎn)k快照中的狀態(tài)。數(shù)據(jù)源被設(shè)置為從位置Sk開始重新讀取流。例如在Apache Kafka中,那意味著告訴消費(fèi)者從偏移量Sk開始重新消費(fèi)。


Checkpoint是Flink實(shí)現(xiàn)容錯機(jī)制最核心的功能,它能夠根據(jù)配置周期性地基于Stream中各個Operator/task的狀態(tài)來生成快照,從而將這些狀態(tài)數(shù)據(jù)定期持久化存儲下來,當(dāng)Flink程序一旦意外崩潰時,重新運(yùn)行程序時可以有選擇地從這些快照進(jìn)行恢復(fù),從而修正因?yàn)楣收蠋淼某绦驍?shù)據(jù)異常。

快照的核心概念之一是barrier。 這些barrier被注入數(shù)據(jù)流并與記錄一起作為數(shù)據(jù)流的一部分向下流動。 barriers永遠(yuǎn)不會超過記錄,數(shù)據(jù)流嚴(yán)格有序,barrier將數(shù)據(jù)流中的記錄隔離成一系列的記錄集合,并將一些集合中的數(shù)據(jù)加入到當(dāng)前的快照中,而另一些數(shù)據(jù)加入到下一個快照中。

每個barrier都帶有快照的ID,并且barrier之前的記錄都進(jìn)入了該快照。 barriers不會中斷流處理,非常輕量級。 來自不同快照的多個barrier可以同時在流中出現(xiàn),這意味著多個快照可能并發(fā)地發(fā)生。

單流的barrier:

barrier在數(shù)據(jù)流源處被注入并行數(shù)據(jù)流中。快照n的barriers被插入的位置(記之為Sn)是快照所包含的數(shù)據(jù)在數(shù)據(jù)源中最大位置。例如,在Apache Kafka中,此位置將是分區(qū)中最后一條記錄的偏移量。 將該位置Sn報(bào)告給checkpoint協(xié)調(diào)器(Flink的JobManager)。

然后barriers向下游流動。當(dāng)一個中間操作算子從其所有輸入流中收到快照n的barriers時,它會為快照n發(fā)出barriers進(jìn)入其所有輸出流中。 一旦sink操作算子(流式DAG的末端)從其所有輸入流接收到barriers n,它就向checkpoint協(xié)調(diào)器確認(rèn)快照n完成。在所有sink確認(rèn)快照后,意味快照著已完成。

一旦完成快照n,job將永遠(yuǎn)不再向數(shù)據(jù)源請求Sn之前的記錄,因?yàn)榇藭r這些記錄(及其后續(xù)記錄)將已經(jīng)通過整個數(shù)據(jù)流拓?fù)?#xff0c;也即是已經(jīng)被處理結(jié)束。

多流的barrier:

接收多個輸入流的運(yùn)算符需要基于快照barriers上對齊(align)輸入流。 上圖說明了這一點(diǎn):

  • 一旦操作算子從一個輸入流接收到快照barriers n,它就不能處理來自該流的任何記錄,直到它從其他輸入接收到barriers n為止。 否則,它會搞混屬于快照n的記錄和屬于快照n + 1的記錄。
  • barriers n所屬的流暫時會被擱置。 從這些流接收的記錄不會被處理,而是放入輸入緩沖區(qū)。可以看到1,2,3會一直放在Input buffer,直到另一個輸入流的快照到達(dá)Operator。
  • 一旦從最后一個流接收到barriers n,操作算子就會發(fā)出所有掛起的向后傳送的記錄,然后自己發(fā)出快照n的barriers。
    之后,它恢復(fù)處理來自所有輸入流的記錄,在處理來自流的記錄之前優(yōu)先處理來自輸入緩沖區(qū)的記錄。

state一般指一個具體的task/operator的狀態(tài)。Flink中包含兩種基礎(chǔ)的狀態(tài):Keyed State和Operator State。

Keyed State,就是基于KeyedStream上的狀態(tài)。這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應(yīng)一個state。

Operator State與Keyed State不同,Operator State跟一個特定operator的一個并發(fā)實(shí)例綁定,整個operator只對應(yīng)一個state。相比較而言,在一個operator上,可能會有很多個key,從而對應(yīng)多個keyed state。

舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實(shí)例中,保存該實(shí)例中消費(fèi)topic的所有(partition, offset)映射。

Keyed State和Operator State,可以以兩種形式存在:原始狀態(tài)和托管狀態(tài)(Raw and Managed State)。托管狀態(tài)是由Flink框架管理的狀態(tài),如ValueState, ListState, MapState等。而raw state即原始狀態(tài),由用戶自行管理狀態(tài)具體的數(shù)據(jù)結(jié)構(gòu),框架在做checkpoint的時候,使用byte[]來讀寫狀態(tài)內(nèi)容。通常在DataStream上的狀態(tài)推薦使用托管的狀態(tài),當(dāng)實(shí)現(xiàn)一個用戶自定義operator時,會使用到原始狀態(tài)。

這里重點(diǎn)說說State-Keyed State,基于key/value的狀態(tài)接口,這些狀態(tài)只能用于keyedStream之上。keyedStream上的operator操作可以包含window或者map等算子操作。這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應(yīng)一個state。

key/value下可用的狀態(tài)接口:

ValueState: 狀態(tài)保存的是一個值,可以通過update()來更新,value()獲取。 ListState: 狀態(tài)保存的是一個列表,通過add()添加數(shù)據(jù),通過get()方法返回一個Iterable來遍歷狀態(tài)值。 ReducingState: 這種狀態(tài)通過用戶傳入的reduceFunction,每次調(diào)用add方法添加值的時候,會調(diào)用reduceFunction,最后合并到一個單一的狀態(tài)值。 MapState:即狀態(tài)值為一個map。用戶通過put或putAll方法添加元素。

以上所述的State對象,僅僅用于與狀態(tài)進(jìn)行交互(更新、刪除、清空等),而真正的狀態(tài)值,有可能是存在內(nèi)存、磁盤、或者其他分布式存儲系統(tǒng)中。實(shí)際上,這些狀態(tài)有三種存儲方式: HeapStateBackend、MemoryStateBackend、FsStateBackend、RockDBStateBackend。

  • MemoryStateBackend: state數(shù)據(jù)保存在java堆內(nèi)存中,執(zhí)行checkpoint的時候,會把state的快照數(shù)據(jù)保存到j(luò)obmanager的內(nèi)存中。
  • FsStateBackend: state數(shù)據(jù)保存在taskmanager的內(nèi)存中,執(zhí)行checkpoint的時候,會把state的快照數(shù)據(jù)保存到配置的文件系統(tǒng)中,可以使用hdfs等分布式文件系統(tǒng)。
  • RocksDBStateBackend: RocksDB跟上面的都略有不同,它會在本地文件系統(tǒng)中維護(hù)狀態(tài),state會直接寫入本地rocksdb中。同時RocksDB需要配置一個遠(yuǎn)端的filesystem。RocksDB克服了state受內(nèi)存限制的缺點(diǎn),同時又能夠持久化到遠(yuǎn)端文件系統(tǒng)中,比較適合在生產(chǎn)中使用。

通過創(chuàng)建一個StateDescriptor,可以得到一個包含特定名稱的狀態(tài)句柄,可以分別創(chuàng)建ValueStateDescriptor、 ListStateDescriptor或ReducingStateDescriptor狀態(tài)句柄。狀態(tài)是通過RuntimeContext來訪問的,因此只能在RichFunction中訪問狀態(tài)。這就要求UDF時要繼承Rich函數(shù),例如RichMapFunction、RichFlatMapFunction等。

Checkpoint的簡單設(shè)置
默認(rèn)情況下,checkpoint不會被保留,取消程序時即會刪除它們,但是可以通過配置保留定期檢查點(diǎn)。開啟Checkpoint功能,有兩種方式。其一是在conf/flink_conf.yaml中做系統(tǒng)設(shè)置;其二是針對任務(wù)再代碼里靈活配置。推薦第二種方式,針對當(dāng)前任務(wù)設(shè)置,設(shè)置代碼如下所示:

//獲取flink的運(yùn)行環(huán)境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設(shè)置statebackend env.setStateBackend(new MemoryStateBackend());CheckpointConfig config = env.getCheckpointConfig();// 任務(wù)流取消和故障時會保留Checkpoint數(shù)據(jù),以便根據(jù)實(shí)際需要恢復(fù)到指定的Checkpoint config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 設(shè)置checkpoint的周期, 每隔1000 ms進(jìn)行啟動一個檢查點(diǎn) config.setCheckpointInterval(1000);// 設(shè)置模式為exactly-once config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 確保檢查點(diǎn)之間有至少500 ms的間隔【checkpoint最小間隔】 config.setMinPauseBetweenCheckpoints(500); // 檢查點(diǎn)必須在一分鐘內(nèi)完成,或者被丟棄【checkpoint的超時時間】 config.setCheckpointTimeout(60000); // 同一時間只允許進(jìn)行一個檢查點(diǎn) config.setMaxConcurrentCheckpoints(1);

上面調(diào)用enableExternalizedCheckpoints設(shè)置為ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink處理程序被cancel后,會保留Checkpoint數(shù)據(jù),以便根據(jù)實(shí)際需要恢復(fù)到指定的Checkpoint處理。

ExternalizedCheckpointCleanup 可選項(xiàng)如下:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業(yè)時保留檢查點(diǎn)。請注意,在這種情況下,您必須在取消后手動清理檢查點(diǎn)狀態(tài)。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業(yè)時刪除檢查點(diǎn)。只有在作業(yè)失敗時,檢查點(diǎn)狀態(tài)才可用。

默認(rèn)情況下,如果設(shè)置了Checkpoint選項(xiàng),則Flink只保留最近成功生成的1個Checkpoint,而當(dāng)Flink程序失敗時,可以從最近的這個Checkpoint來進(jìn)行恢復(fù)。但是,如果希望保留多個Checkpoint,并能夠根據(jù)實(shí)際需要選擇其中一個進(jìn)行恢復(fù),這樣會更加靈活,比如,發(fā)現(xiàn)最近4個小時數(shù)據(jù)記錄處理有問題,希望將整個狀態(tài)還原到4小時之前。

Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數(shù):

state.checkpoints.num-retained: 20

Flink checkpoint目錄分別對應(yīng)的是 jobId,flink提供了在啟動之時通過設(shè)置 -s 參數(shù)指定checkpoint目錄, 讓新的jobId 讀取該checkpoint元文件信息和狀態(tài)信息,從而達(dá)到指定時間節(jié)點(diǎn)啟動job。

Savepoint
說到Checkpoint不得不介紹Savepoint。Savepoint是通過Flink的檢查點(diǎn)機(jī)制創(chuàng)建的流作業(yè)執(zhí)行狀態(tài)的一致圖像。可以使用Savepoints來停止和恢復(fù),分叉或更新Flink作業(yè)。保存點(diǎn)由兩部分組成:穩(wěn)定存儲(例如HDFS,S3,…)上的(通常是大的)二進(jìn)制文件和(相對較小的)元數(shù)據(jù)文件的目錄。穩(wěn)定存儲上的文件表示作業(yè)執(zhí)行狀態(tài)圖像的凈數(shù)據(jù)。Savepoint的元數(shù)據(jù)文件以(絕對路徑)的形式包含(主要)指向作為Savepoint一部分的穩(wěn)定存儲上的所有文件的指針。

從概念上講,Flink的Savepoints與Checkpoints的不同之處在于備份與傳統(tǒng)數(shù)據(jù)庫系統(tǒng)中的恢復(fù)日志不同。檢查點(diǎn)的主要目的是在意外的作業(yè)失敗時提供恢復(fù)機(jī)制。

Checkpoint的生命周期由Flink管理,即Flink創(chuàng)建,擁有和發(fā)布Checkpoint,無需用戶交互。作為一種恢復(fù)和定期觸發(fā)的方法,Checkpoint實(shí)現(xiàn)的兩個主要設(shè)計(jì)目標(biāo)是:i)being as lightweight to create (輕量級),ii)fast restore (快速恢復(fù))。針對這些目標(biāo)的優(yōu)化可以利用某些屬性,例如,JobCode在執(zhí)行嘗試之間不會改變。

與此相反,Savepoints由用戶創(chuàng)建,擁有和刪除。它們的用例是planned (計(jì)劃) 的,manual backup( 手動備份 ) 和 resume(恢復(fù))。例如,這可能是Flink版本的更新,更改Job graph ,更改 parallelism ,分配第二個作業(yè),如紅色/藍(lán)色部署,等等。當(dāng)然,Savepoints必須在終止工作后繼續(xù)存在。從概念上講,保存點(diǎn)的生成和恢復(fù)成本可能更高,并且更多地關(guān)注可移植性和對前面提到的作業(yè)更改的支持。

為了能夠在將來升級程序,主要的必要更改是通過uid(String)方法手動指定operator ID 。這些ID用于確定每個運(yùn)算符的狀態(tài)。

DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID

如果未手動指定ID,則會自動生成這些ID。只要這些ID不變,就可以從保存點(diǎn)自動恢復(fù)。生成的ID取決于程序的結(jié)構(gòu),并且對程序更改很敏感。因此,強(qiáng)烈建議手動分配這些ID。
觸發(fā)保存點(diǎn)時,會創(chuàng)建一個新的保存點(diǎn)目錄,其中將存儲數(shù)據(jù)和元數(shù)據(jù)。可以通過配置默認(rèn)目標(biāo)目錄或使用觸發(fā)器命令指定自定義目標(biāo)目錄來控制此目錄的位置。


注意,checkpoint時的對齊步驟可能增加流式程序的等待時間。通常,這種額外的延遲大約為幾毫秒,但也會見到一些延遲顯著增加的情況。 對于要求所有記錄始終具有超低延遲(幾毫秒)的應(yīng)用程序,Flink可以在checkpoint期間跳過流對齊。一旦操作算子看到每個輸入流的checkpoint barriers,就會寫 checkpoint 快照。

當(dāng)跳過對齊時,即使在 checkpoint n的某些 checkpoint barriers 到達(dá)之后,操作算子仍繼續(xù)處理所有輸入。這樣,操作算子還可以在創(chuàng)建 checkpoint n 的狀態(tài)快照之前,繼續(xù)處理屬于checkpoint n + 1的數(shù)據(jù)。 在還原時,這些記錄將作為重復(fù)記錄出現(xiàn),因?yàn)樗鼈兌及?checkpoint n 的狀態(tài)快照中,并將作為 checkpoint n 之后數(shù)據(jù)的一部分進(jìn)行重復(fù)處理。

總結(jié)

以上是生活随笔為你收集整理的Flink状态管理与CheckPoint、Savepoint的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。