flink中的WaterMark调研和具体实例
一些基本概念介紹:
| Event Time | 事件時(shí)間是每個(gè)事件在其生產(chǎn)設(shè)備上發(fā)生的時(shí)間 |
| Ingestion Time | 攝取時(shí)間是數(shù)據(jù)進(jìn)入Flink的時(shí)間 |
| Processing Time | 處理時(shí)間是是指正在執(zhí)行相應(yīng)算子操作的機(jī)器的系統(tǒng)時(shí)間,默認(rèn)的時(shí)間屬性就是Processing Time |
[7]在Flink的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用EventTime,一般只在EventTime無法使用時(shí),才會(huì)被迫使用ProcessingTime或者IngestionTime。默認(rèn)情況下,Flink框架中處理的時(shí)間語義為ProcessingTime,如果要使用EventTime,那么需要引入EventTime的時(shí)間屬性,引入方式如下所示:
import org.apache.flink.streaming.api.TimeCharacteristic
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
上面的這個(gè)只是為了表示當(dāng)前的環(huán)境中有EventTIme屬性,講大白話就是輸入的數(shù)據(jù)源中會(huì)有時(shí)間戳,不用想太多。
--------------------------------------------和時(shí)間相關(guān)的一些概念--------------------------------------------------------------------
假設(shè)nc -lk傳入數(shù)據(jù),intelliji處理數(shù)據(jù),那么其實(shí)流處理的大概有這么幾個(gè)時(shí)間相關(guān)的數(shù)據(jù)。
①終端輸入數(shù)據(jù)的時(shí)間
②該數(shù)據(jù)中包含的時(shí)間
③intellij處理數(shù)據(jù)的時(shí)間
④window中的窗口寬度,slidingWindow還包括滑動(dòng)時(shí)間寬度
⑤水位線時(shí)間, 這個(gè)是從上一條數(shù)據(jù)中得到的時(shí)間戳,另外還有水位線延遲時(shí)間(用來對付遲到的數(shù)據(jù))
首先,①≠②,整個(gè)實(shí)驗(yàn)不關(guān)心①的時(shí)間,所以①我們不理會(huì)。
③我們一般也不關(guān)心(注意指的是一般情況下不關(guān)心,因?yàn)檫€是以數(shù)據(jù)源頭中的時(shí)間戳中的Event Time為主)
④和⑤是我們關(guān)心范疇
所以主要是②④⑤
---------------------------------------------------------WaterMark和各種參數(shù)之間的關(guān)系--------------------------------------------------------------
什么是WaterMark呢,其實(shí)翻譯過來就是水位線。
數(shù)據(jù)流好比水流(數(shù)據(jù))從瀑布頂端傾瀉而下,
而瀑布下方的水位線(數(shù)據(jù)生成的時(shí)間,Event Time)隨著瀑布的流下而不斷上升。
------------------------------------------------------------------------------------------------------------------------------------------------
Watermarks是基于已經(jīng)收集的消息來估算是否還有消息未到達(dá),本質(zhì)上是一個(gè)時(shí)間戳。時(shí)間戳反映的是事件發(fā)生的時(shí)間,而不是事件處理的時(shí)間
所謂的事件(Event),講人話其實(shí)就是一條數(shù)據(jù)。
1.設(shè)置StreamTime Characteristic為Event Time,即設(shè)置流式時(shí)間窗口
2.創(chuàng)建的DataStreamSource調(diào)用assignTimestampsAndWatermarks中設(shè)置WaterMarks種類(二選一):
①AssignerWithPeriodicWatermarks(周期性(一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))生成水印)
②AssignerWithPunctuatedWatermarks(數(shù)據(jù)流中每一個(gè)遞增的?EventTime?都會(huì)產(chǎn)生一個(gè)?Watermark)
?
數(shù)據(jù)示例:
| 實(shí)驗(yàn)類型(代碼) | nc -lk 3456輸入 | 實(shí)驗(yàn)結(jié)果 | 實(shí)驗(yàn)解析 |
| 窗口的生命周期 | ? tests | 測試::7> (tests,3) | 這個(gè)實(shí)驗(yàn)必須輸入三個(gè)相同的字符串后,程序才會(huì)有反應(yīng) |
| 滾動(dòng)窗口 | later,6565 | windows:>>>:7> window:[1599362540000-1599362545000]:{ (later,6565,1) } | window后面的是實(shí)際處理時(shí)間。 later后面手動(dòng)輸入的時(shí)間戳被忽視 |
| 滑動(dòng)窗口 | later,32234 | windows:>>>:7> window:[1599364140000-1599364145000]:{ (later,32234,1) } windows:>>>:7> window:[1599364143000-1599364148000]:{ (later,32234,1) } | 一條數(shù)據(jù)出現(xiàn)在兩個(gè)窗口中 |
| 會(huì)話窗口 | test,12312 test,235233 ? | windows:>>>:5> window:[1599366609562-1599366622057]:{ (test,12312,1),(test,235233,1) } | processtime間隔時(shí)間超過10s,就會(huì)輸出上一個(gè)窗口(含10s所有輸入內(nèi)容) |
這里的例如1599366609562單位是ms
我們根據(jù)業(yè)務(wù)的需求還判斷使用哪個(gè)時(shí)間類型,一般來說使用Event Time更多,比如:在統(tǒng)計(jì)最近5分鐘的訂單總金額時(shí),我們需要的是真實(shí)的訂單時(shí)間,而不是進(jìn)入flink的時(shí)間或者是處理時(shí)間。
另外,生產(chǎn)環(huán)境中大多數(shù)情況下數(shù)據(jù)源頭都會(huì)帶有時(shí)間戳,時(shí)間戳經(jīng)過flink處理提取得到Event Time。
上述三種窗口的事件事件寫法與處理時(shí)間寫法的對照(不包含窗口偏移設(shè)置)
| ? | 事件時(shí)間(Event Time) | 處理時(shí)間 |
| TumblingWindow | .window(TumblingEventTimeWindows.of(Time.seconds(5))) | .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) |
| SlidingWindow | .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) | .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) |
| SessionWindow | .window(EventTimeSessionWindows.withGap(Time.minutes(10))) | .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) |
上述三種窗口的事件事件寫法與處理時(shí)間寫法的對照(包含窗口偏移設(shè)置)
| ? | 事件時(shí)間(Event Time) | 處理時(shí)間 |
| TumblingWindow | .window(TumblingEventTimeWindows.of(Time.seconds(5),Time.seconds(3))) | .window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3))) |
| SlidingWindow | .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))) | .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))) |
| SessionWindow | .window(EventTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3))) | .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3))) |
系統(tǒng)時(shí)間可以理解為是intellij處理數(shù)據(jù)的時(shí)間。
而EventTimeWindows那就是讓窗口以事件中的時(shí)間戳(Event TIme)作為時(shí)間標(biāo)尺了。
-----------------------------------------------下面是兩個(gè)看似矛盾的說法--------------------------------------------------------------------------
[1]WaterMark為上一條數(shù)據(jù)的Event Time,并非當(dāng)前的WaterMark
[2]WaterMark為當(dāng)前數(shù)據(jù)的Event Time(根據(jù)實(shí)驗(yàn)輸出來看)
為什么會(huì)有這么兩種截然不同的說法呢?
這是因?yàn)?
[1]的WaterMark是在getCurrentWatermark()函數(shù)中輸出的。
[2]的WaterMark是在extractTimestamp()函數(shù)中輸出的。
所以其實(shí)不矛盾
-----------------------------------------------------對于包含WaterMark的程序的觸發(fā)條件--------------------------------------
①并行度大于1的時(shí)候,需要各個(gè)節(jié)點(diǎn)都滿足②③。
?在多并行度的情況下,Watermark 會(huì)有一個(gè)對齊機(jī)制,這個(gè)對齊機(jī)制會(huì)取所有 Channel中最小的 Watermark。
②window條件要滿足(例如每5條數(shù)據(jù)觸發(fā))、
③算子和定時(shí)器相關(guān),需要waterMark高于定時(shí)器設(shè)置的時(shí)間。例如ctx.timerService().registerEventTimeTimer(value.getBizTime());
---------------------------------------------------------------------------------------------------------------------------------------
接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實(shí)際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計(jì),導(dǎo)致窗口在它們到達(dá)之前已經(jīng)關(guān)閉。[8]
遲到事件出現(xiàn)時(shí)窗口已經(jīng)關(guān)閉并產(chǎn)出了計(jì)算結(jié)果,因此處理的方法有3種:[8]
- 重新激活已經(jīng)關(guān)閉的窗口并重新計(jì)算以修正結(jié)果(Side Output)。
- 將遲到事件收集起來另外處理(Allowed Lateness)。
- 將遲到事件視為錯(cuò)誤消息并丟棄。
Flink 默認(rèn)的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。[8]
接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實(shí)際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計(jì),導(dǎo)致窗口在它們到達(dá)之前已經(jīng)關(guān)閉。[8]
上面都是一些概念,實(shí)驗(yàn)方面的記錄在[9]
Reference:
[1]Flink WaterMark實(shí)例(閱讀完畢)
[2]Flink系列之Time和WaterMark(代碼不完整)
[5]Flink EventTime和Watermarks案例分析(閱讀完畢)
[3]老板讓阿粉學(xué)習(xí) flink 中的 Watermark,現(xiàn)在他出教程了(還存在些問題)
[4]Flink的WaterMark,及demo實(shí)例(閱讀完畢)
[7]Flink的window、時(shí)間語義,Watermark機(jī)制,多代ctx.timerService().registerEventTimeTimer(value.getBizTime());碼案例詳解,Flink學(xué)習(xí)入門(三)(閱讀完畢)
[8][白話解析] Flink的Watermark機(jī)制?(判斷遲到數(shù)據(jù),閱讀完畢)
[9]Flink EventTime和Watermarks原理結(jié)合代碼分析(轉(zhuǎn)載+解決+精簡記錄)
?
總結(jié)
以上是生活随笔為你收集整理的flink中的WaterMark调研和具体实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cad工作界面怎么设置为经典模式
- 下一篇: 正余弦函数的Talor近似