日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

flink 自定义 窗口_Flink入门实战 (下)

發(fā)布時間:2023/11/27 生活经验 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink 自定义 窗口_Flink入门实战 (下) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、 時間語義與 Wartermark

1、 Flink 中的時間語義

在 Flink 的流式處理中,會涉及到時間的不同概念,如下圖所示:

Event Time:是事件創(chuàng)建的時間。它通常由事件中的時間戳描述,例如采集的
日志數(shù)據(jù)中,每一條日志都會記錄自己的生成時間,Flink 通過時間戳分配器訪問事
件時間戳。 Ingestion Time:是數(shù)據(jù)進(jìn)入 Flink 的時間。 Processing Time:是每一個執(zhí)行基于時間操作的算子的本地系統(tǒng)時間,與機(jī)器
相關(guān),默認(rèn)的時間屬性就是 Processing Time。

一個例子——電影《星球大戰(zhàn)》:

例如,一條日志進(jìn)入 Flink 的時間為 2017-11-12 10:00:00.123,到達(dá) Window 的

系統(tǒng)時間為 2017-11-12 10:00:01.234,日志的內(nèi)容如下:

2017

對于業(yè)務(wù)來說,要統(tǒng)計(jì) 1min 內(nèi)的故障日志個數(shù),哪個時間是最有意義的?——

eventTime,因?yàn)槲覀円鶕?jù)日志的生成時間進(jìn)行統(tǒng)計(jì)。

2、 EventTime 的引入

在 Flink 的流式處理中,絕大部分的業(yè)務(wù)都會使用 eventTime,一般只在
eventTime 無法使用時,才會被迫使用 ProcessingTime 或者 IngestionTime。

如果要使用 EventTime,那么需要引入 EventTime 的時間屬性,引入方式如下所

示:

val 

3、Watermark

3.1、基本概念

我們知道,流處理從事件產(chǎn)生,到流經(jīng) source,再到 operator,中間是有一個過
程和時間的,雖然大部分情況下,流到 operator 的數(shù)據(jù)都是按照事件產(chǎn)生的時間順
序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就
是指 Flink 接收到的事件的先后順序不是嚴(yán)格按照事件的 Event Time 順序排列的。

那么此時出現(xiàn)一個問題,一旦出現(xiàn)亂序,如果只根據(jù) eventTime 決定 window 的
運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時必須要有
個機(jī)制來保證一個特定的時間后,必須觸發(fā) window 去進(jìn)行計(jì)算了,這個特別的機(jī)
制,就是 Watermark。
  • Watermark 是一種衡量 Event Time 進(jìn)展的機(jī)制。
  • Watermark 是用于處理亂序事件的,而正確的處理亂序事件,通常用

Watermark 機(jī)制結(jié)合 window 來實(shí)現(xiàn)。

  • 數(shù)據(jù)流中的 Watermark 用于表示 timestamp 小于 Watermark 的數(shù)據(jù),都已經(jīng)

到達(dá)了,因此,window 的執(zhí)行也是由 Watermark 觸發(fā)的。

  • Watermark 可以理解成一個延遲觸發(fā)機(jī)制,我們可以設(shè)置 Watermark 的延時

時長 t,每次系統(tǒng)會校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的 maxEventTime,然后認(rèn)定 eventTime

小于 maxEventTime - t 的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時間等于

maxEventTime – t,那么這個窗口被觸發(fā)執(zhí)行。

有序流的 Watermarker 如下圖所示:(Watermark 設(shè)置為 0)

亂序流的 Watermarker 如下圖所示:(Watermark 設(shè)置為 2)

當(dāng) Flink 接收到數(shù)據(jù)時,會按照一定的規(guī)則去生成 Watermark,這條 Watermark
就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的 maxEventTime - 延遲時長,也就是說,Watermark 是
由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的 Watermark 比當(dāng)前未觸發(fā)的窗口的停止時間要晚,
那么就會觸發(fā)相應(yīng)窗口的執(zhí)行。由于 Watermark 是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行
過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。
上圖中,我們設(shè)置的允許最大延遲到達(dá)時間為 2s,所以時間戳為 7s 的事件對應(yīng)
的 Watermark 是 5s,時間戳為 12s 的事件的 Watermark 是 10s,如果我們的窗口 1
是 1s~5s,窗口 2 是 6s~10s,那么時間戳為 7s 的事件到達(dá)時的 Watermarker 恰好觸
發(fā)窗口 1,時間戳為 12s 的事件到達(dá)時的 Watermark 恰好觸發(fā)窗口 2。

Watermark 就是觸發(fā)前一窗口的“關(guān)窗時間”,一旦觸發(fā)關(guān)門那么以當(dāng)前時刻

為準(zhǔn)在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會收入窗中。

只要沒有達(dá)到水位那么不管現(xiàn)實(shí)中的時間推進(jìn)了多久都不會觸發(fā)關(guān)窗。

3.2、Watermark 的引入

watermark 的引入很簡單,對于亂序數(shù)據(jù),最常見的引用方式如下:

dataStream
Event Time 的使用一定要指定數(shù)據(jù)源中的時間戳。否則程序無法知道事件的事
件時間是什么(數(shù)據(jù)源里的數(shù)據(jù)沒有時間戳的話,就只能使用 Processing Time 了)。
我們看到上面的例子中創(chuàng)建了一個看起來有點(diǎn)復(fù)雜的類,這個類實(shí)現(xiàn)的其實(shí)就
是分配時間戳的接口。Flink 暴露了 TimestampAssigner 接口供我們實(shí)現(xiàn),使我們可
以自定義如何從事件數(shù)據(jù)中抽取時間戳。
val 

MyAssigner 有兩種類型

  • AssignerWithPeriodicWatermarks
  • AssignerWithPunctuatedWatermarks

以上兩個接口都繼承自 TimestampAssigner。

Assigner with periodic watermarks

周期性的生成 watermark:系統(tǒng)會周期性的將 watermark 插入到流中(水位線也
是一種特殊的事件!)。默認(rèn)周期是 200 毫秒。可以使用
ExecutionConfig.setAutoWatermarkInterval()方法進(jìn)行設(shè)置。
val 

產(chǎn)生 watermark 的邏輯:每隔 5 秒鐘,Flink 會調(diào)用

AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。如果方法返回一個
時間戳大于之前水位的時間戳,新的 watermark 會被插入到流中。這個檢查保證了
水位線是單調(diào)遞增的。如果方法返回的時間戳小于等于之前水位的時間戳,則不會
產(chǎn)生新的 watermark。

例子,自定義一個周期性的時間戳抽取:

class 

一種簡單的特殊情況是,如果我們事先得知數(shù)據(jù)流的時間戳是單調(diào)遞增的,也

就是說沒有亂序,那我們可以使用 assignAscendingTimestamps,這個方法會直接使

用數(shù)據(jù)的時間戳生成 watermark。

val 

而對于亂序數(shù)據(jù)流,如果我們能大致估算出數(shù)據(jù)流中的事件的最大延遲時間,

就可以使用如下代碼:

val 

Assigner with punctuated watermarks

間斷式地生成 watermark。和周期性生成的方式不同,這種方式不是固定時間的,

而是可以根據(jù)需要對每條數(shù)據(jù)進(jìn)行篩選和處理。直接上代碼來舉個例子,我們只給

sensor_1 的傳感器的數(shù)據(jù)流插入 watermark:

class 

4、 EvnetTime 在 window 中的使用

案例一:Flink窗口操作之簡單測試

4.1、滾動窗口(TumblingEventTimeWindows)

代碼具體實(shí)現(xiàn):

package 

啟動程序后,視頻演示:表示10秒之內(nèi)統(tǒng)計(jì)數(shù)據(jù)

Flink的滾動窗口實(shí)現(xiàn)方式https://www.zhihu.com/video/1241477585970135040

案例二:Flink窗口操作之事件時間測試

代碼具體實(shí)現(xiàn):

package 

啟動程序后,視頻演示:事件時間測試表示執(zhí)行多少個才能把窗口關(guān)閉,由于這里簡單測試沒遇到窗口關(guān)閉

事件時間測試 https://www.zhihu.com/video/1241492597937111040

案例三:Flink窗口操作之Window起始點(diǎn)

視頻演示:

Window起始點(diǎn)https://www.zhihu.com/video/1241685277611372544

二、ProcessFunction API(底層 API)

我們之前學(xué)習(xí)的轉(zhuǎn)換算子是無法訪問事件的時間戳信息和水位線信息的。而這
在一些應(yīng)用場景下,極為重要。例如 MapFunction 這樣的 map 轉(zhuǎn)換算子就無法訪問
時間戳或者當(dāng)前事件的事件時間。
基于此,DataStream API 提供了一系列的 Low-Level 轉(zhuǎn)換算子??梢?b>訪問時間 戳、watermark 以及注冊定時事件。還可以輸出特定的一些事件,例如超時事件等。
Process Function 用來構(gòu)建事件驅(qū)動的應(yīng)用以及實(shí)現(xiàn)自定義的業(yè)務(wù)邏輯(使用之前的
window 函數(shù)和轉(zhuǎn)換算子無法實(shí)現(xiàn))。例如,Flink SQL 就是使用 Process Function 實(shí)
現(xiàn)的。

Flink 提供了 8 個 Process Function:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

1、KeyedProcessFunction

這里我們重點(diǎn)介紹 KeyedProcessFunction。

KeyedProcessFunction 用來操作 KeyedStream。KeyedProcessFunction 會處理流
的每一個元素,輸出為 0 個、1 個或者多個元素。所有的 Process Function 都繼承自
RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。

而 KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個方法:

  • processElement(v: IN, ctx: Context, out: Collector[OUT]),
流中的每一個元素 都會調(diào)用這個方法,調(diào)用結(jié)果將會放在 Collector 數(shù)據(jù)類型中輸出。Context 可以訪問元素的時間戳,元素的 key,以及 TimerService 時間服務(wù)。Context
還可以將結(jié)果輸出到別的流(side outputs)。
  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])
是一個回 調(diào)函數(shù)。當(dāng)之前注冊的定時器觸發(fā)時調(diào)用。參數(shù) timestamp 為定時器所設(shè)定
的觸發(fā)的時間戳。Collector 為輸出結(jié)果的集合。OnTimerContext 和
processElement 的 Context 參數(shù)一樣,提供了上下文的一些信息,例如定時器
觸發(fā)的時間信息(事件時間或者處理時間)。

2、TimerService 和 定時器(Timers)

Context 和 OnTimerContext 所持有的 TimerService 對象擁有以下方法:

  • currentProcessingTime(): Long 返回當(dāng)前處理時間
  • ? currentWatermark(): Long 返回當(dāng)前 watermark 的時間戳
  • ? registerProcessingTimeTimer(timestamp: Long): Unit 會注冊當(dāng)前 key 的

processing time 的定時器。當(dāng) processing time 到達(dá)定時時間時,觸發(fā) timer。

  • registerEventTimeTimer(timestamp: Long): Unit 會注冊當(dāng)前 key 的 event time

定時器。當(dāng)水位線大于等于定時器注冊的時間時,觸發(fā)定時器執(zhí)行回調(diào)函數(shù)。

  • deleteProcessingTimeTimer(timestamp: Long): Unit 刪除之前注冊處理時間定

時器。如果沒有這個時間戳的定時器,則不執(zhí)行。

  • deleteEventTimeTimer(timestamp: Long): Unit 刪除之前注冊的事件時間定時

器,如果沒有此時間戳的定時器,則不執(zhí)行。

當(dāng)定時器 timer 觸發(fā)時,會執(zhí)行回調(diào)函數(shù) onTimer()。注意定時器 timer 只能在

keyed streams 上面使用。

下面舉個例子說明 KeyedProcessFunction 如何操作 KeyedStream。

需求:監(jiān)控溫度傳感器的溫度值,如果溫度值在一秒鐘之內(nèi)(processing time)連

續(xù)上升,則報(bào)警。

val 

看一下 TempIncreaseAlertFunction 如何實(shí)現(xiàn), 程序中使用了 ValueState 這樣一個

狀態(tài)變量。

具體代碼實(shí)現(xiàn):

package 

啟動程序,控制臺打印數(shù)據(jù)

3、側(cè)輸出流(SideOutput)

大部分的 DataStream API 的算子的輸出是單一輸出,也就是某種數(shù)據(jù)類型的流。
除了 split 算子,可以將一條流分成多條流,這些流的數(shù)據(jù)類型也都相同。process
function 的 side outputs 功能可以產(chǎn)生多條流,并且這些流的數(shù)據(jù)類型可以不一樣。
一個 side output 可以定義為 OutputTag[X]對象,X 是輸出流的數(shù)據(jù)類型。process
function 可以通過 Context 對象發(fā)射一個事件到一個或者多個 side outputs。

下面是一個示例程序:

val 
接下來我們實(shí)現(xiàn) FreezingMonitor 函數(shù),用來監(jiān)控傳感器溫度值,將溫度值低于
32F 的溫度輸出到 side output。

具體代碼實(shí)現(xiàn):

package 

啟動程序,控制臺打印數(shù)據(jù)

冰點(diǎn)低溫輸出流https://www.zhihu.com/video/1241752113551646720

4、CoProcessFunction

對于兩條輸入流,DataStream API 提供了 CoProcessFunction 這樣的 low-level
操作。CoProcessFunction 提供了操作每一個輸入流的方法: processElement1()和
processElement2()。
類似于 ProcessFunction,這兩種方法都通過 Context 對象來調(diào)用。這個 Context
對象可以訪問事件數(shù)據(jù),定時器時間戳,TimerService,以及 side outputs。
CoProcessFunction 也提供了 onTimer()回調(diào)函數(shù)。

總結(jié)

以上是生活随笔為你收集整理的flink 自定义 窗口_Flink入门实战 (下)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。