日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink的ProcessFunction API

發布時間:2024/7/5 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink的ProcessFunction API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1 ProcessFunction

?? ProcessFunction是一個低階的流處理操作,可以訪問事件(event)(流元素),狀態(state)(容錯性,一致性,僅在keyed stream中),定時器(timers)(event time和processing time, 僅在keyed stream中)。也就是說可以訪問普通的轉換算子無法訪問事件的時間戳信息和Watermark的。

?? ProcessFunction可以看作是一個具有keyed state 鍵控狀態和 timers定時器訪問權的FlatMapFunction,通過對輸入流中接收的每個事件調用來處理事件。①通過RuntimeContext訪問keyed state②計時器允許應用程序對處理時間和事件時間中的更改作出響應。對processElement(…)函數的每次調用都獲得一個Context對象,該對象可以訪問元素的event time timestamp和TimerService;③TimerService可用于為將來的event/process time瞬間注冊回調。當到達計時器的特定時間時,將調用onTimer(…)方法。在該調用期間,所有狀態都再次限定在創建計時器時使用的鍵的范圍內,從而允許計時器操作鍵控狀態。總之ProcessFunction可以訪問時間戳、watermark以及注冊定時事件,輸出特定的一些事件等。Flink SQL就是使用Process Function實現的。

?? 如果要訪問鍵控狀態和計時器,則必須應用在keyedStream上

stream.keyBy(...).process(new MyProcessFunction())

?? Flink提供了8個Process Function:ProcessFunction,KeyedProcessFunction,CoProcessFunction,ProcessJoinFunction,BroadcastProcessFunction,KeyedBroadcastProcessFunction,ProcessWindowFunction,ProcessAllWindowFunction。

?? 所有的Process Function都繼承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法,還額外提供了兩個方法processElement和onTimer

?? processElement:每來一個元素都會調用這個方法,調用結果將會放在Collector數據類型中輸出。獲得的Context可以訪問元素的時間戳,元素的key,以及TimerService時間服務。Context還可以將結果輸出到別的流(side outputs)。

?? onTimer:是一個回調函數,當之前注冊的定時器到達觸發時間調用。參數timestamp為定時器所設定的觸發的時間戳。Collector為輸出結果的集合。OnTimerContext和processElement的Context參數一樣,提供了上下文的一些信息,例如定時器觸發的時間信息(事件時間或者處理時間)。

2 低階join

?? 要實現對兩個輸入的低級操作,應用程序可以使用CoProcessFunction或KeyedCoProcessFunction。

?? CoProcessFunction實現對兩個輸入的低階操作,它綁定到兩個不同的輸入流,分別調用processElement1(…)和processElement2(…)對兩個輸入流的數據進行處理

?? 實現低階join通常遵循以下模式:①為一個(或兩個)輸入創建一個狀態對象②當從輸入源收到元素時,更新狀態③從另一個輸入接收元素后,檢索狀態并生成連接的結果

3 KeyedProcessFunction

?? KeyedProcessFunction作為ProcessFunction的擴展,在其onTimer(…)方法中提供對定時器對應key的訪問。

?? KeyedProcessFunction用來操作KeyedStream。KeyedProcessFunction會處理流的每一個元素,輸出為0個、1個或者多個元素。

override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {var key = ctx.getCurrentKey// ... }

4 Timers

?? processing-time/event-time timer都由TimerService在內部維護并排隊等待執行,僅在keyed stream中有效。

?? 由于Flink對(每個key+timestamp)只維護一個計時器。如果為相同的timestamp注冊了多個timer ,則只調用onTimer()方法一次。

?? Flink保證同步調用onTimer()和processElement() 。因此用戶不必擔心狀態的并發修改。

?? 容錯:Timer具有容錯和checkpoint能力(基于flink app的狀態)。從故障恢復或從savepoint啟動應用程序時,Timer將被恢復。大量計時器會增加檢查點時間,因為計時器是檢查點狀態的一部分。

?? 定時器合并:由于Flink對每個鍵和時間戳只維護一個計時器,因此可以通過降低計時器頻率來合并計時器,從而減少計時器的數量。 event-time timer只會在watermarks到來時觸發。

//對于1秒的定時器分辨率(事件或處理時間),可以將目標時間舍入整秒。計時器的發射時間最多提前1秒,但不遲于要求的毫秒精度。因此,每鍵最多有一個定時器和第二個定時器。 val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000 ctx.timerService.registerProcessingTimeTimer(coalescedTime)//事件時間計時器只在水印進入的情況下觸發,您還可以使用當前Watermark調度這些計時器并將其與下一個Watermark合并: val coalescedTime = ctx.timerService.currentWatermark + 1 ctx.timerService.registerEventTimeTimer(coalescedTime)//停止處理時間計時器: val timestampOfTimerToStop = ... ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)//停止事件時間計時器: val timestampOfTimerToStop = ... ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

5 官方案例

?? KeyedProcessFunction維護每個鍵的計數,并在沒有對該鍵進行更新的情況下,在一分鐘內(在事件發生時)發出一個鍵/計數對:

  • 計數、鍵和最后修改時間戳存儲在ValueState,這是由Key隱式限定范圍的。
  • 對于每個記錄,KeyedProcessFunction增加計數器并設置最后修改的時間戳。
  • 該函數還會在以后的一分鐘內安排一個回調(在事件發生時)。
  • 在每次回調時,它會檢查回調的事件時間戳和存儲計數的最后修改時間,如果它們匹配,則發出鍵/計數(也就是說,在這一分鐘內沒有發生進一步的更新)。
import org.apache.flink.api.common.state.ValueState import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.util.Collector// the source data stream val stream: DataStream[Tuple2[String, String]] = ...// apply the process function onto a keyed stream val result: DataStream[Tuple2[String, Long]] = stream.keyBy(0).process(new CountWithTimeoutFunction())/*** The data type stored in the state*/ case class CountWithTimestamp(key: String, count: Long, lastModified: Long)/*** The implementation of the ProcessFunction that maintains the count and timeouts*/ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {/** The state that is maintained by this process function */lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = {// initialize or retrieve/update the stateval current: CountWithTimestamp = state.value match {case null =>CountWithTimestamp(value._1, 1, ctx.timestamp)case CountWithTimestamp(key, count, lastModified) =>CountWithTimestamp(key, count + 1, ctx.timestamp)}// write the state backstate.update(current)// schedule the next timer 60 seconds from the current event timectx.timerService.registerEventTimeTimer(current.lastModified + 60000)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = {state.value match {case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>out.collect((key, count))case _ =>}} }

總結

以上是生活随笔為你收集整理的Flink的ProcessFunction API的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。