日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Flink – window operator

發布時間:2024/1/17 windows 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink – window operator 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

參考,

http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/

http://wuchong.me/blog/2016/06/06/flink-internals-session-window/?

?

WindowOperator

window operator通過WindowAssigner和Trigger來實現它的邏輯

當一個element到達時,通過KeySelector先assign一個key,并且通過WindowAssigner assign若干個windows,這樣這個element會被放入若干個pane

一個pane會存放所有相同key和相同window的elements

/*** An operator that implements the logic for windowing based on a {@link WindowAssigner} and* {@link Trigger}.** <p>* When an element arrives it gets assigned a key using a {@link KeySelector} and it gets* assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element* is put into panes. A pane is the bucket of elements that have the same key and same* {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the* {@code WindowAssigner}.** <p>* Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when* the contents of the pane should be processed to emit results. When a trigger fires,* the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for* the pane to which the {@code Trigger} belongs.** @param <K> The type of key returned by the {@code KeySelector}.* @param <IN> The type of the incoming elements.* @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.*/ @Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window>extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {// ------------------------------------------------------------------------// Configuration values and user functions// ------------------------------------------------------------------------protected final WindowAssigner<? super IN, W> windowAssigner;protected final KeySelector<IN, K> keySelector;protected final Trigger<? super IN, ? super W> trigger;protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;/*** The allowed lateness for elements. This is used for:* <ul>* <li>Deciding if an element should be dropped from a window due to lateness.* <li>Clearing the state of a window if the system time passes the* {@code window.maxTimestamp + allowedLateness} landmark.* </ul>*/protected final long allowedLateness; //允許late多久,即當watermark已經觸發后/*** To keep track of the current watermark so that we can immediately fire if a trigger* registers an event time callback for a timestamp that lies in the past.*/protected transient long currentWatermark = Long.MIN_VALUE;protected transient Context context = new Context(null, null); //Trigger Contextprotected transient WindowAssigner.WindowAssignerContext windowAssignerContext; //只為獲取getCurrentProcessingTime// ------------------------------------------------------------------------// State that needs to be checkpointed// ------------------------------------------------------------------------/*** Processing time timers that are currently in-flight.*/protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; //Timer用于存儲timestamp,key,window, queue按時間排序/*** Current waiting watermark callbacks.*/protected transient Set<Timer<K, W>> watermarkTimers;protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue; // protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey; //用于記錄merge后的stateWindow和window的對應關系

?

對于window operator而已,最關鍵的是WindowAssigner和Trigger

?

WindowAssigner

WindowAssigner,用于指定一個tuple應該被分配到那些windows去

借用個圖,可以看出有多少種WindowAssigner

對于WindowAssigner,最關鍵的接口是,assignWindows

為一個element,分配一組windows, Collection<W>

@PublicEvolving public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;/*** Returns a {@code Collection} of windows that should be assigned to the element.** @param element The element to which windows should be assigned.* @param timestamp The timestamp of the element.* @param context The {@link WindowAssignerContext} in which the assigner operates.*/public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);/*** Returns the default trigger associated with this {@code WindowAssigner}.*/public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);/*** Returns a {@link TypeSerializer} for serializing windows that are assigned by* this {@code WindowAssigner}.*/public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

實際看下,具體WindowAssigner的實現

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {final long now = context.getCurrentProcessingTime();long start = now - (now % size);return Collections.singletonList(new TimeWindow(start, start + size)); //很簡單,分配一個TimeWindow }@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create(); //默認給出的是ProcessingTimeTrigger,如其名} public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {private final long size;private final long slide;@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {List<TimeWindow> windows = new ArrayList<>((int) (size / slide));long lastStart = timestamp - timestamp % slide;for (long start = lastStart;start > timestamp - size;start -= slide) {windows.add(new TimeWindow(start, start + size)); //可以看到這里會assign多個TimeWindow,因為是slide }return windows;} else {}}@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}

?

Trigger, Evictor

參考,Flink – Trigger,Evictor

?

下面看看3個主要的接口,分別觸發,onElement,onEventTime,onProcessingTime

processElement

處理element到達的邏輯,觸發onElement

public void processElement(StreamRecord<IN> element) throws Exception {Collection<W> elementWindows = windowAssigner.assignWindows( //通過WindowAssigner為element分配一系列windows element.getValue(), element.getTimestamp(), windowAssignerContext);final K key = (K) getStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) { //如果是MergingWindow//.......} else { //如果是普通windowfor (W window: elementWindows) {// drop if the window is already lateif (isLate(window)) { //late data的處理,默認是丟棄 continue;}AppendingState<IN, ACC> windowState = getPartitionedState( //從backend中取出該window的狀態,就是buffer的element window, windowSerializer, windowStateDescriptor);windowState.add(element.getValue()); //把當前的element加入buffer state context.key = key;context.window = window; //context的設計相當tricky和晦澀 TriggerResult triggerResult = context.onElement(element); //觸發onElment,得到triggerResultif (triggerResult.isFire()) { //對triggerResult做各種處理ACC contents = windowState.get();if (contents == null) {continue;}fire(window, contents); //如果fire,真正去計算窗口中的elements }if (triggerResult.isPurge()) {cleanup(window, windowState, null); //purge,即去cleanup elements} else {registerCleanupTimer(window);}}} }

?

判斷是否是late data的邏輯

protected boolean isLate(W window) {return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark)); } private long cleanupTime(W window) {long cleanupTime = window.maxTimestamp() + allowedLateness; //allowedLateness; return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; }

?

fire邏輯

private void fire(W window, ACC contents) throws Exception {timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());userFunction.apply(context.key, context.window, contents, timestampedCollector); }

?

processWatermark

處理watermark,onEvent觸發

@Override public void processWatermark(Watermark mark) throws Exception {boolean fire;do {Timer<K, W> timer = watermarkTimersQueue.peek(); //這叫watermarkTimersQueue,是否有些歧義,叫eventTimerQueue更好理解些if (timer != null && timer.timestamp <= mark.getTimestamp()) {fire = true;watermarkTimers.remove(timer);watermarkTimersQueue.remove();context.key = timer.key;context.window = timer.window;setKeyContext(timer.key); //stateBackend.setCurrentKey(key); AppendingState<IN, ACC> windowState;MergingWindowSet<W> mergingWindows = null;if (windowAssigner instanceof MergingWindowAssigner) { //MergingWindowmergingWindows = getMergingWindowSet();W stateWindow = mergingWindows.getStateWindow(context.window);if (stateWindow == null) {// then the window is already purged and this is a cleanup// timer set due to allowed lateness that has nothing to clean,// so it is safe to just ignorecontinue;}windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);} else { //普通windowwindowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); //取得window的state }ACC contents = windowState.get();if (contents == null) {// if we have no state, there is nothing to docontinue;}TriggerResult triggerResult = context.onEventTime(timer.timestamp); //觸發onEventif (triggerResult.isFire()) {fire(context.window, contents);}if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {cleanup(context.window, windowState, mergingWindows);}} else {fire = false;}} while (fire); //如果fire為true,繼續看下個waterMarkTimer是否需要fire output.emitWatermark(mark); //把waterMark傳遞下去this.currentWatermark = mark.getTimestamp(); //更新currentWaterMark }

?

trigger

首先,這個函數的命名有問題,為何和前面的process…不匹配

這個是用來觸發onProcessingTime,這個需要依賴系統時間的定時器來觸發,邏輯和processWatermark基本等同,只是觸發條件不一樣

@Override public void trigger(long time) throws Exception {boolean fire;//Remove information about the triggering task processingTimeTimerFutures.remove(time);processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));do {Timer<K, W> timer = processingTimeTimersQueue.peek();if (timer != null && timer.timestamp <= time) {fire = true;processingTimeTimers.remove(timer);processingTimeTimersQueue.remove();context.key = timer.key;context.window = timer.window;setKeyContext(timer.key);AppendingState<IN, ACC> windowState;MergingWindowSet<W> mergingWindows = null;if (windowAssigner instanceof MergingWindowAssigner) {mergingWindows = getMergingWindowSet();W stateWindow = mergingWindows.getStateWindow(context.window);if (stateWindow == null) {// then the window is already purged and this is a cleanup// timer set due to allowed lateness that has nothing to clean,// so it is safe to just ignorecontinue;}windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);} else {windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);}ACC contents = windowState.get();if (contents == null) {// if we have no state, there is nothing to docontinue;}TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);if (triggerResult.isFire()) {fire(context.window, contents);}if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {cleanup(context.window, windowState, mergingWindows);}} else {fire = false;}} while (fire); }

?

EvictingWindowOperator

Evicting對于WindowOperator而言,就是多了Evictor

private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());// Work around type system restrictions...int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); //執行evict FluentIterable<IN> projectedContents = FluentIterable.from(contents).skip(toEvict).transform(new Function<StreamRecord<IN>, IN>() {@Overridepublic IN apply(StreamRecord<IN> input) {return input.getValue();}});userFunction.apply(context.key, context.window, projectedContents, timestampedCollector); }

關鍵的邏輯就是在fire的時候,在apply function之前,會先remove需要evict的elements

轉載于:https://www.cnblogs.com/fxjwind/p/6137608.html

總結

以上是生活随笔為你收集整理的Flink – window operator的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。