日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink中的WaterMark调研和具体实例

發(fā)布時(shí)間:2023/12/31 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink中的WaterMark调研和具体实例 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一些基本概念介紹:

Event Time事件時(shí)間是每個(gè)事件在其生產(chǎn)設(shè)備上發(fā)生的時(shí)間
Ingestion Time攝取時(shí)間是數(shù)據(jù)進(jìn)入Flink的時(shí)間
Processing Time處理時(shí)間是是指正在執(zhí)行相應(yīng)算子操作的機(jī)器的系統(tǒng)時(shí)間,默認(rèn)的時(shí)間屬性就是Processing Time

[7]在Flink的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用EventTime,一般只在EventTime無法使用時(shí),才會(huì)被迫使用ProcessingTime或者IngestionTime。默認(rèn)情況下,Flink框架中處理的時(shí)間語義為ProcessingTime,如果要使用EventTime,那么需要引入EventTime的時(shí)間屬性,引入方式如下所示:

import org.apache.flink.streaming.api.TimeCharacteristic

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

上面的這個(gè)只是為了表示當(dāng)前的環(huán)境中有EventTIme屬性,講大白話就是輸入的數(shù)據(jù)源中會(huì)有時(shí)間戳,不用想太多。

--------------------------------------------和時(shí)間相關(guān)的一些概念--------------------------------------------------------------------

假設(shè)nc -lk傳入數(shù)據(jù),intelliji處理數(shù)據(jù),那么其實(shí)流處理的大概有這么幾個(gè)時(shí)間相關(guān)的數(shù)據(jù)。

①終端輸入數(shù)據(jù)的時(shí)間

②該數(shù)據(jù)中包含的時(shí)間

③intellij處理數(shù)據(jù)的時(shí)間

④window中的窗口寬度,slidingWindow還包括滑動(dòng)時(shí)間寬度

⑤水位線時(shí)間, 這個(gè)是從上一條數(shù)據(jù)中得到的時(shí)間戳,另外還有水位線延遲時(shí)間(用來對付遲到的數(shù)據(jù))

首先,①≠②,整個(gè)實(shí)驗(yàn)不關(guān)心①的時(shí)間,所以①我們不理會(huì)。

③我們一般也不關(guān)心(注意指的是一般情況下不關(guān)心,因?yàn)檫€是以數(shù)據(jù)源頭中的時(shí)間戳中的Event Time為主)

④和⑤是我們關(guān)心范疇

所以主要是②④⑤

---------------------------------------------------------WaterMark和各種參數(shù)之間的關(guān)系--------------------------------------------------------------

什么是WaterMark呢,其實(shí)翻譯過來就是水位線。

數(shù)據(jù)流好比水流(數(shù)據(jù))從瀑布頂端傾瀉而下,

而瀑布下方的水位線(數(shù)據(jù)生成的時(shí)間,Event Time)隨著瀑布的流下而不斷上升。

------------------------------------------------------------------------------------------------------------------------------------------------

Watermarks是基于已經(jīng)收集的消息來估算是否還有消息未到達(dá),本質(zhì)上是一個(gè)時(shí)間戳。時(shí)間戳反映的是事件發(fā)生的時(shí)間,而不是事件處理的時(shí)間

所謂的事件(Event),講人話其實(shí)就是一條數(shù)據(jù)。

1.設(shè)置StreamTime Characteristic為Event Time,即設(shè)置流式時(shí)間窗口

2.創(chuàng)建的DataStreamSource調(diào)用assignTimestampsAndWatermarks中設(shè)置WaterMarks種類(二選一):

①AssignerWithPeriodicWatermarks(周期性(一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))生成水印)

②AssignerWithPunctuatedWatermarks(數(shù)據(jù)流中每一個(gè)遞增的?EventTime?都會(huì)產(chǎn)生一個(gè)?Watermark)

?

數(shù)據(jù)示例:

實(shí)驗(yàn)類型(代碼)nc -lk 3456輸入實(shí)驗(yàn)結(jié)果實(shí)驗(yàn)解析
窗口的生命周期

?

tests
tests
tests

測試::7> (tests,3)這個(gè)實(shí)驗(yàn)必須輸入三個(gè)相同的字符串后,程序才會(huì)有反應(yīng)

滾動(dòng)窗口

later,6565windows:>>>:7> window:[1599362540000-1599362545000]:{ (later,6565,1) }

window后面的是實(shí)際處理時(shí)間。

later后面手動(dòng)輸入的時(shí)間戳被忽視

滑動(dòng)窗口later,32234windows:>>>:7> window:[1599364140000-1599364145000]:{ (later,32234,1) }
windows:>>>:7> window:[1599364143000-1599364148000]:{ (later,32234,1) }
一條數(shù)據(jù)出現(xiàn)在兩個(gè)窗口中
會(huì)話窗口test,12312
test,235233
?
windows:>>>:5> window:[1599366609562-1599366622057]:{ (test,12312,1),(test,235233,1) }processtime間隔時(shí)間超過10s,就會(huì)輸出上一個(gè)窗口(含10s所有輸入內(nèi)容)

這里的例如1599366609562單位是ms

我們根據(jù)業(yè)務(wù)的需求還判斷使用哪個(gè)時(shí)間類型,一般來說使用Event Time更多,比如:在統(tǒng)計(jì)最近5分鐘的訂單總金額時(shí),我們需要的是真實(shí)的訂單時(shí)間,而不是進(jìn)入flink的時(shí)間或者是處理時(shí)間。

另外,生產(chǎn)環(huán)境中大多數(shù)情況下數(shù)據(jù)源頭都會(huì)帶有時(shí)間戳,時(shí)間戳經(jīng)過flink處理提取得到Event Time。

上述三種窗口的事件事件寫法與處理時(shí)間寫法的對照(不包含窗口偏移設(shè)置)

?事件時(shí)間(Event Time)處理時(shí)間
TumblingWindow.window(TumblingEventTimeWindows.of(Time.seconds(5))).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
SlidingWindow.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
SessionWindow.window(EventTimeSessionWindows.withGap(Time.minutes(10))).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))

上述三種窗口的事件事件寫法與處理時(shí)間寫法的對照(包含窗口偏移設(shè)置)

?事件時(shí)間(Event Time)處理時(shí)間
TumblingWindow.window(TumblingEventTimeWindows.of(Time.seconds(5),Time.seconds(3))).window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3)))
SlidingWindow.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3)))
SessionWindow.window(EventTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3))).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3)))

系統(tǒng)時(shí)間可以理解為是intellij處理數(shù)據(jù)的時(shí)間。

而EventTimeWindows那就是讓窗口以事件中的時(shí)間戳(Event TIme)作為時(shí)間標(biāo)尺了。

-----------------------------------------------下面是兩個(gè)看似矛盾的說法--------------------------------------------------------------------------

[1]WaterMark為上一條數(shù)據(jù)的Event Time,并非當(dāng)前的WaterMark

[2]WaterMark為當(dāng)前數(shù)據(jù)的Event Time(根據(jù)實(shí)驗(yàn)輸出來看)

為什么會(huì)有這么兩種截然不同的說法呢?

這是因?yàn)?

[1]的WaterMark是在getCurrentWatermark()函數(shù)中輸出的。

[2]的WaterMark是在extractTimestamp()函數(shù)中輸出的。

所以其實(shí)不矛盾

-----------------------------------------------------對于包含WaterMark的程序的觸發(fā)條件--------------------------------------
①并行度大于1的時(shí)候,需要各個(gè)節(jié)點(diǎn)都滿足②③。
?在多并行度的情況下,Watermark 會(huì)有一個(gè)對齊機(jī)制,這個(gè)對齊機(jī)制會(huì)取所有 Channel中最小的 Watermark。
②window條件要滿足(例如每5條數(shù)據(jù)觸發(fā))、
③算子和定時(shí)器相關(guān),需要waterMark高于定時(shí)器設(shè)置的時(shí)間。例如ctx.timerService().registerEventTimeTimer(value.getBizTime());

---------------------------------------------------------------------------------------------------------------------------------------

接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實(shí)際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計(jì),導(dǎo)致窗口在它們到達(dá)之前已經(jīng)關(guān)閉。[8]

遲到事件出現(xiàn)時(shí)窗口已經(jīng)關(guān)閉并產(chǎn)出了計(jì)算結(jié)果,因此處理的方法有3種:[8]

  • 重新激活已經(jīng)關(guān)閉的窗口并重新計(jì)算以修正結(jié)果(Side Output)。
  • 將遲到事件收集起來另外處理(Allowed Lateness)。
  • 將遲到事件視為錯(cuò)誤消息并丟棄。

Flink 默認(rèn)的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。[8]

接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實(shí)際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計(jì),導(dǎo)致窗口在它們到達(dá)之前已經(jīng)關(guān)閉。[8]

上面都是一些概念,實(shí)驗(yàn)方面的記錄在[9]

Reference:

[1]Flink WaterMark實(shí)例(閱讀完畢)

[2]Flink系列之Time和WaterMark(代碼不完整)

[5]Flink EventTime和Watermarks案例分析(閱讀完畢)
[3]老板讓阿粉學(xué)習(xí) flink 中的 Watermark,現(xiàn)在他出教程了(還存在些問題)

[4]Flink的WaterMark,及demo實(shí)例(閱讀完畢)

[7]Flink的window、時(shí)間語義,Watermark機(jī)制,多代ctx.timerService().registerEventTimeTimer(value.getBizTime());碼案例詳解,Flink學(xué)習(xí)入門(三)(閱讀完畢)

[8][白話解析] Flink的Watermark機(jī)制?(判斷遲到數(shù)據(jù),閱讀完畢)

[9]Flink EventTime和Watermarks原理結(jié)合代碼分析(轉(zhuǎn)載+解決+精簡記錄)
?

總結(jié)

以上是生活随笔為你收集整理的flink中的WaterMark调研和具体实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。