日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

聊聊Flink必知必会(五)

發(fā)布時間:2023/11/18 windows 57 coder
生活随笔 收集整理的這篇文章主要介紹了 聊聊Flink必知必会(五) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
  1. 聊聊Flink的必知必會(三)
  2. 聊聊Flink必知必會(四)

從源碼中,根據(jù)關鍵的代碼,梳理一下Flink中的時間與窗口實現(xiàn)邏輯。

WindowedStream

對數(shù)據(jù)流執(zhí)行keyBy()操作后,再調用window()方法,就會返回WindowedStream,表示分區(qū)后又加窗的數(shù)據(jù)流。如果數(shù)據(jù)流沒有經過分區(qū),直接調用window()方法則會返回AllWindowedStream

如下:

// 構造函數(shù)
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
    this.input = input;
    this.builder =
    new WindowOperatorBuilder<>(
    windowAssigner,
    windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
    input.getExecutionConfig(),
    input.getType(),
    input.getKeySelector(),
    input.getKeyType());
}
        
// KeyedStream類型,表示被加窗的輸入流。
private final KeyedStream<T, K> input;

// 用于構建WindowOperator,最終會生成windowAssigner,Evictor,Trigger
private final WindowOperatorBuilder<T, K, W> builder;

在這里面還涉及到一些窗口的基本計算算子,比如reduce,aggregate,apply,process,sum等等.

窗口相關模型的實現(xiàn)

Window

Window類是Flink中對窗口的抽象。它是一個抽象類,包含抽象方法maxTimestamp(),用于獲取屬于該窗口的最大時間戳。

TimeWindow類是其子類。包含了窗口的start,end,offset等時間概念字段,這里會計算窗口的起始時間:

// 構造函數(shù)
public TimeWindow(long start, long end) {
    this.start = start;
    this.end = end;
}

// timestamp:獲取窗口啟動時的第一個時間戳epoch毫秒
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    final long remainder = (timestamp - offset) % windowSize;
    // handle both positive and negative cases
    if (remainder < 0) {
        return timestamp - (remainder + windowSize);
    } else {
        return timestamp - remainder;
    }
}

WindowAssigner

WindowAssigner表示窗口分配器,用來把元素分配到零個或多個窗口(Window對象)中。它是一個抽象類,其中重要的抽象方法為assignWindows()方法,用來給元素分配窗口。

Flink有多種類型的窗口,如Tumbling Window、Sliding Window等。各種類型的窗口又分為基于事件時間或處理時間的窗口。WindowAssigner的實現(xiàn)類就對應著具體類型的窗口。

SlidingEventTimeWindows是WindowAssigner的另一個實現(xiàn)類,表示基于事件時間的Sliding Window。它有3個long類型的字段size、slide和offset,分別表示窗口的大小、滑動的步長和窗口起始位置的偏移量。它對assignWindows()方法的實現(xiàn)如下:

@Override
public Collection<TimeWindow> assignWindows(
        Object element, long timestamp, WindowAssignerContext context) {
        // Long.MIN_VALUE is currently assigned when no timestamp is present
    if (timestamp > Long.MIN_VALUE) {
        if (staggerOffset == null) {
            staggerOffset =
                    windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
        }
        long start =
                TimeWindow.getWindowStartWithOffset(
                        timestamp, (globalOffset + staggerOffset) % size, size);
        // 返回構建好起止時間的TimeWindow
        return Collections.singletonList(new TimeWindow(start, start + size));
    } else {
        throw new RuntimeException(
                "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
                        + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
                        + "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

設置窗口觸發(fā)器Trigger

@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
    return EventTimeTrigger.create();
}

WindowAssigner與其主要實現(xiàn)類的關系如下:

這些類的含義分別如下

  • GlobalWindows:將所有元素分配進同一個窗口的全局窗口分配器。
  • SlidingEventTimeWindows:基于事件時間的滑動窗口分配器。
  • SlidingProcessingTimeWindows:基于處理時間的滑動窗口分配器。
  • TumblingEventTimeWindows:基于事件時間的滾動窗口分配器。
  • TumblingProcessingTimeWindows:基于處理時間的滾動窗口分配器。
  • EventTimeSessionWindows:基于事件時間的會話窗口分配器。
  • ProcessingTimeSessionWindows:基于處理時間的會話窗口分配器。

Trigger

Trigger表示窗口觸發(fā)器。它是一個抽象類,主要定義了下面3個方法用于確定窗口何時觸發(fā)計算:

// 每個元素到來時觸發(fā)
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 處理時間的定時器觸發(fā)時
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 事件時間的定時器觸發(fā)時調用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

這3個方法的返回結果為TriggerResult對象。TriggerResult是一個枚舉類,包含兩個boolean類型的字段fire和purge,分別表示窗口是否觸發(fā)計算和窗口內的元素是否需要清空。

CONTINUE(false, false),
FIRE_AND_PURGE(true, true),
FIRE(true, false),
PURGE(false, true);

TriggerResult(boolean fire, boolean purge) {
    this.purge = purge;
    this.fire = fire;
}

窗口觸發(fā)器的實現(xiàn)由用戶根據(jù)業(yè)務需求自定義。Flink默認基于事件時間的觸發(fā)器為EventTimeTrigger,其三個方法處理如下

@Override
public TriggerResult onElement(
        Object element, long timestamp, TimeWindow window, TriggerContext ctx)
        throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // 如果水印已經超過窗口,則立即觸發(fā)
        return TriggerResult.FIRE;
    } else {
        // 注冊事件時間定時器
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}

/*
 * 處理時間,窗口不觸發(fā)計算也不清空內部元素。
 */
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
        throws Exception {
    return TriggerResult.CONTINUE;
}

Trigger與其主要實現(xiàn)類的繼承關系

這些類的含義如下

  • CountTrigger:元素數(shù)達到設置的個數(shù)時觸發(fā)計算的觸發(fā)器。
  • DeltaTrigger:基于DeltaFunction和設置的閾值觸發(fā)計算的觸發(fā)器。
  • EventTimeTrigger:基于事件時間的觸發(fā)器。
  • ProcessingTimeTrigger:基于處理時間的觸發(fā)器。
  • PurgingTrigger:可包裝其他觸發(fā)器的清空觸發(fā)器。
  • ContinuousEventTimeTrigger:基于事件時間并按照一定的時間間隔連續(xù)觸發(fā)計算的觸發(fā)器。
  • ContinuousProcessingTimeTrigger:基于處理時間并按照一定的時間間隔連續(xù)觸發(fā)計算的觸發(fā)器。

windowOperator

WindowedStream的構造函數(shù)中,會生成WindowOperatorBuilder,該類可以返回WindowOperator,這兩個類負責窗口分配器、窗口觸發(fā)器和窗口剔除器這些組件在運行時的協(xié)同工作。

對于WindowOperator,除了窗口分配器和窗口觸發(fā)器的相關字段,可以先了解下面兩個字段。

// StateDescriptor類型,表示窗口狀態(tài)描述符。
private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;

// 表示窗口的狀態(tài),窗口內的元素都在其中維護。
private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;

窗口中的元素并沒有保存在Window對象中,而是維護在windowState中。windowStateDescriptor則是創(chuàng)建windowState所需用到的描述符。

當有元素到來時,會調用WindowOperator的processElement()方法:

public void processElement(StreamRecord<IN> element) throws Exception {
    // 分配窗口
    final Collection<W> elementWindows = windowAssigner.assignWindows(
        element.getValue(), element.getTimestamp(), windowAssignerContext);
            ...
        if (windowAssigner instanceof MergingWindowAssigner) { // Session Window的情況
            ...
        } else {
            for (W window: elementWindows) { // 非Session Window的情況
                ...
                // 將Window對象設置為namespace并添加元素到windowState中
                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
                triggerContext.key = key;
                triggerContext.window = window;
                // 獲取TriggerResult,確定接下來是否需要觸發(fā)計算或清空窗口
                TriggerResult triggerResult = triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    // 觸發(fā)計算
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    // 清空窗口
                    windowState.clear();
                }
                ...
            }
        }
    ...
}

在處理時間或事件時間的定時器觸發(fā)時,會調用WindowOperator的onProcessingTime()方法或onEventTime()方法,其中的邏輯與onElement()方法的大同小異。

Watermarks

水位線(watermark)是選用事件時間來進行數(shù)據(jù)處理時特有的概念。它的本質就是時間戳,從上游流向下游,表示系統(tǒng)認為數(shù)據(jù)中的事件時間在該時間戳之前的數(shù)據(jù)都已到達。

Flink中,Watermark類表示水位。

/** Creates a new watermark with the given timestamp in milliseconds. */
public Watermark(long timestamp) {
    this.timestamp = timestamp;
}

watermark的生成有兩種方式,這里不贅述,主要講述下基于配置的策略生成watermark的方式。如下的代碼是比較常見的配置:

// 分配事件時間與水印
.assignTimestampsAndWatermarks(
        // forBoundedOutOfOrderness 會根據(jù)事件的時間戳和允許的最大亂序時間生成水印。
        // Duration 設置了最大亂序時間為1秒。這意味著 Flink 將允許在這1秒的時間范圍內的事件不按照事件時間的順序到達,這個時間段內的事件會被認為是"有序的"。
        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1))
        // 設置事件時間分配器,從Event對象中提取時間戳作為事件時間
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

在Flink內部,會根據(jù)配置的策略調用BoundedOutOfOrdernessWatermarks生成watermark。該類的代碼如下:

public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 每條數(shù)據(jù)都會更新最大值
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 發(fā)送 watermark 邏輯
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

onEvent決定每次事件都會取得最大的事件時間更新;onPeriodicEmit則是周期性的更新并傳遞到下游。

AbstractStreamOperator

WatermarkGenerator接口的調用是在AbstractStreamOperator抽象類的子類TimestampsAndWatermarksOperator中。其生命周期open函數(shù)與每個數(shù)據(jù)到來的處理函數(shù)processElement,如下:

@Override
public void open() throws Exception {
    super.open();

    timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
    watermarkGenerator =
            emitProgressiveWatermarks
                    ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                    : new NoWatermarksGenerator<>();

    wmOutput = new WatermarkEmitter(output);

    watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
    if (watermarkInterval > 0 && emitProgressiveWatermarks) {
        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }
}

@Override
public void processElement(final StreamRecord<T> element) throws Exception {
    final T event = element.getValue();
    final long previousTimestamp =
            element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
    // 從分配器中提取事件時間戳
    final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);

    element.setTimestamp(newTimestamp);
    output.collect(element);
    // 調用水印生成器
    watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
}

從方法的入?yún)⒖梢钥闯鰜?flink 算子間的數(shù)據(jù)流動是 StreamRecord 對象。它對數(shù)據(jù)的處理邏輯是什么都不做直接向下游發(fā)送,然后調用 onEvent 記錄最大時間戳,也就是說:flink 是先發(fā)送數(shù)據(jù)再生成 watermark,watermark 永遠在生成它的數(shù)據(jù)之后。

總結

上面的一系列相關代碼,只是冰山一角,暫時只是把關鍵涉及到的部分捋了一下。最后畫個圖,展示其大致思路。

參考:

Flink Watermark 源碼解析

總結

以上是生活随笔為你收集整理的聊聊Flink必知必会(五)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。