日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

KafkaStream之时间窗口WindowBy

發(fā)布時(shí)間:2023/12/20 编程问答 53 豆豆
生活随笔 收集整理的這篇文章主要介紹了 KafkaStream之时间窗口WindowBy 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Kafka Stream的大部分API還是比較容易理解和使用的,但是,其中的時(shí)間窗口聚合即windowBy方法還是需要仔細(xì)研究下,否則很容易使用錯(cuò)誤。

本文先引入Kafka Stream,然后主要針對(duì)時(shí)間窗口聚合API即windowBy()做詳細(xì)分析。

引言

Kafka Streams是一個(gè)用于構(gòu)建應(yīng)用程序和微服務(wù)的客戶端庫(kù),其中的輸入和輸出數(shù)據(jù)存儲(chǔ)在Kafka集群中。它結(jié)合了在客戶端編寫和部署Java/Scala應(yīng)用程序的簡(jiǎn)單性,以及Kafka服務(wù)器集群的優(yōu)點(diǎn)。

Kafka Stream為我們屏蔽了直接使用Kafka Consumer的復(fù)雜性,不用手動(dòng)進(jìn)行輪詢poll(),不必關(guān)心commit()。而且,使用Kafka Stream,可以方便的進(jìn)行實(shí)時(shí)計(jì)算、實(shí)時(shí)分析。

比如官方Demo,統(tǒng)計(jì)topic中不同單詞的出現(xiàn)次數(shù):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class WordCountApplication {public static void main(final String[] args) throws InterruptedException {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500");// 默認(rèn)30s commit一次props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();// 從名為“TextLinesTopic”的topic創(chuàng)建流。KStream<String, String> textLines = builder.stream("TextLinesTopic");KTable<String, Long> wordCounts =textLines.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))).groupBy((key, word) -> word).count();// 官方文檔實(shí)例中是 wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); 直接寫回kafka// 我們這里為了方便觀察,直接打印到控制臺(tái)wordCounts.toStream().print(Printed.toSysOut());KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();Thread.currentThread().join();}}

啟動(dòng)程序、kafka服務(wù)端。

啟動(dòng)kafka-console-producer, 創(chuàng)建主題TextLinesTopic0,并發(fā)送消息。

1 2 3 .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic TextLinesTopic.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TextLinesTopic0

可以看到,每次向kafka寫入一條消息后,我們的demo程序在控制臺(tái)會(huì)立即輸出產(chǎn)生變化的數(shù)據(jù)統(tǒng)計(jì)。

這其中的簡(jiǎn)單原理可以參考http://kafka.apache.org/23/documentation/streams/quickstart#quickstart_streams_process。我們的流計(jì)算應(yīng)用保存一個(gè)KTable<String, Long>?用來(lái)記錄統(tǒng)計(jì)條目,隨著流中元素的到來(lái),KTable中的統(tǒng)計(jì)條目發(fā)生變化,這些變化回發(fā)送到下游流中(本文中的下游流就是控制臺(tái))。

借助KafkaStream的API,我們可以方便的編寫實(shí)時(shí)計(jì)算應(yīng)用。比如上面的groupBy、count方法,再比如接下來(lái)的windowBy方法,如果不使用KafakStream,直接使用Kafka Consumer自行實(shí)現(xiàn),則比較麻煩。

Kafka Stream的大部分API還是比較容易理解和使用的,但是,其中的時(shí)間窗口聚合即windowBy方法還是需要仔細(xì)研究下,否則很容易使用錯(cuò)誤。

WindowBy

根據(jù)時(shí)間窗口做聚合,是在實(shí)時(shí)計(jì)算中非常重要的功能。比如我們經(jīng)常需要統(tǒng)計(jì)最近一段時(shí)間內(nèi)的count、sum、avg等統(tǒng)計(jì)數(shù)據(jù)。

Kafka中有這樣四種時(shí)間窗口。

Window nameBehaviorShort description
Tumbling time windowTime-basedFixed-size, non-overlapping, gap-less windows
Hopping time windowTime-basedFixed-size, overlapping windows
Sliding time windowTime-basedFixed-size, overlapping windows that work on differences between record timestamps
Session windowSession-basedDynamically-sized, non-overlapping, data-driven windows

Tumbling time windows

翻滾時(shí)間窗口Tumbling time windows是跳躍時(shí)間窗口hopping time windows的一種特殊情況,與后者一樣,翻滾時(shí)間窗也是基于時(shí)間間隔的。但它是固定大小、不重疊、無(wú)間隙的窗口。翻滾窗口只由一個(gè)屬性定義:size。翻滾窗口實(shí)際上是一種跳躍窗口,其窗口大小與其前進(jìn)間隔相等。由于翻滾窗口從不重疊,數(shù)據(jù)記錄將只屬于一個(gè)窗口。

Tumbling time windows are?aligned to the epoch, with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, tumbling windows with a size of 5000ms have predictable window boundaries?[0;5000),[5000;10000),...?— and?not?[1000;6000),[6000;11000),...?or even something “random” like?[1452;6452),[6452;11452),....

看個(gè)翻滾窗口的例子:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private static final String BOOT_STRAP_SERVERS = "localhost:9092"; private static final String TEST_TOPIC = "test_topic"; private static final long TIME_WINDOW_SECONDS = 5L; //時(shí)間窗口大小@Test public void testTumblingTimeWindows() throws InterruptedException {Properties props = configStreamProperties();StreamsBuilder builder = new StreamsBuilder();KStream<String, String> data = builder.stream(TEST_TOPIC);Instant initTime = Instant.now();data.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(TIME_WINDOW_SECONDS))).count(Materialized.with(Serdes.String(), Serdes.Long())).toStream().filterNot(((windowedKey, value) -> this.isOldWindow(windowedKey, value, initTime))) //剔除太舊的時(shí)間窗口,程序二次啟動(dòng)時(shí),會(huì)重新讀取歷史數(shù)據(jù)進(jìn)行整套流處理,為了不影響觀察,這里過(guò)濾掉歷史數(shù)據(jù).foreach(this::dealWithTimeWindowAggrValue);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, props);streams.start();Thread.currentThread().join(); }

Test啟動(dòng)前啟動(dòng)一個(gè)KafkaProducer,每1秒產(chǎn)生一條數(shù)據(jù),數(shù)據(jù)的key為“service_1”,value為“key@當(dāng)前時(shí)間”。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @BeforeClass public static void generateValue() {Properties props = new Properties();props.put("bootstrap.servers", BOOT_STRAP_SERVERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("request.required.acks", "0");new Thread(() -> {Producer<String, String> producer = new KafkaProducer<>(props);try {while (true) {TimeUnit.SECONDS.sleep(1L);Instant now = Instant.now();String key = "service_1";String value = key + "@" + toLocalTimeStr(now);producer.send(new ProducerRecord<>(TEST_TOPIC, key, value));}} catch (Exception e) {e.printStackTrace();producer.close();}}).start(); } private static String toLocalTimeStr(Instant i) {return i.atZone(ZoneId.systemDefault()).toLocalDateTime().toString(); }

下面是些公共代碼,之后的例子也有會(huì)用到 :

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private Properties configStreamProperties() {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-ljf-test");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOT_STRAP_SERVERS);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500");//todo 默認(rèn)值為30s,會(huì)導(dǎo)致30s才提交一次數(shù)據(jù)。return props; }private boolean isOldWindow(Windowed<String> windowKey, Long value, Instant initTime) {Instant windowEnd = windowKey.window().endTime();return windowEnd.isBefore(initTime); }private void dealWithTimeWindowAggrValue(Windowed<String> key, Long value) {Windowed<String> windowed = getReadableWindowed(key);System.out.println("處理聚合結(jié)果:key=" + windowed + ",value=" + value); }private Windowed<String> getReadableWindowed(Windowed<String> key) {return new Windowed<String>(key.key(), key.window()) {@Overridepublic String toString() {String startTimeStr = toLocalTimeStr(Instant.ofEpochMilli(window().start()));String endTimeStr = toLocalTimeStr(Instant.ofEpochMilli(window().end()));return "[" + key() + "@" + startTimeStr + "/" + endTimeStr + "]";}}; }

上面的testTumblingTimeWindows()中,創(chuàng)建了一個(gè)流任務(wù),先groupByKey(),再調(diào)用count()計(jì)算每個(gè)時(shí)間窗口的消息個(gè)數(shù)。我們創(chuàng)建了一個(gè)size為5秒的翻滾時(shí)間窗口。而且generateValue()方法中啟動(dòng)了一個(gè)Producer,每隔一秒發(fā)送一條消息。使用JUnit運(yùn)行testTumblingTimeWindows(),控制臺(tái)輸出如下(在創(chuàng)建流計(jì)算邏輯時(shí),我們最后使用foreach(this::dealWithTimeWindowAggrValue)將上游流(這里是filterNot方法的結(jié)果)傳來(lái)的元素打印到控制臺(tái)):

可以看到,每個(gè)時(shí)間窗口統(tǒng)計(jì)到5的時(shí)候,重新從1開始count。這也印證了翻滾窗口的特性。

這里我們?cè)倏聪耮roupByKey的特性。

如果將generateValue()方法改為,模擬另一個(gè)服務(wù)也在發(fā)送消息:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @BeforeClass public static void generateValue() {Properties props = new Properties();// ...配置不變,此處省略new Thread(() -> {Producer<String, String> producer = new KafkaProducer<>(props);try {while (true) {TimeUnit.SECONDS.sleep(1L);Instant now = Instant.now();String key = "service_1";String value = key + "@" + toLocalTimeStr(now);producer.send(new ProducerRecord<>(TEST_TOPIC, key, value));String key2 = "service_2"; // 模擬另一個(gè)服務(wù)也在發(fā)送消息producer.send(new ProducerRecord<>(TEST_TOPIC, key2, value));}} catch (Exception e) {e.printStackTrace();producer.close();}}).start(); }

重新運(yùn)行testTumblingTimeWindows():

可以看到,我們的流任務(wù)根據(jù)key的不同先做group,在進(jìn)行時(shí)間窗口的聚合。

PS:類似groupByKey,還有g(shù)roupBy,前者可以看做后者的特化,后者可以根據(jù)消Message的key、value自定義分組邏輯。關(guān)于此,可以參考API官方文檔Stateless transformations

Sliding time windows

Sliding windows are actually quite different from hopping and tumbling windows. In Kafka Streams, sliding windows are used only for?join operations, and can be specified through the?JoinWindows?class.

A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if (in the case of symmetric windows) the difference of their timestamps is within the window size. Thus, sliding windows are?not aligned to the epoch, but to the data record timestamps. In contrast to hopping and tumbling windows, the lower and upper window time interval bounds of sliding windows are?both inclusive.

Session Windows

Session windows are used to aggregate key-based events into so-called?sessions, the process of which is referred to as?sessionization. Sessions represent a?period of activity?separated by a defined?gap of inactivity?(or “idleness”). Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If an event falls outside of the session gap, then a new session will be created.

Hopping time windows

我們口中的“滑動(dòng)窗口”,在Kafka這里叫做跳躍窗口。

Note?Hopping windows vs. sliding windows:?Hopping windows are sometimes called “sliding windows” in other stream processing tools. Kafka Streams follows the terminology in academic literature, where the semantics of sliding windows are different to those of hopping windows.

Hopping time windows are?aligned to the epoch, with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, hopping windows with a size of 5000ms and an advance interval (“hop”) of 3000ms have predictable window boundaries?[0;5000),[3000;8000),...?— and?not?[1000;6000),[4000;9000),...?or even something “random” like?[1452;6452),[4452;9452),....

跳躍時(shí)間窗口Hopping time windows是基于時(shí)間間隔的窗口。它們?yōu)楣潭ù笮?可能)重疊的窗口建模。跳躍窗口由兩個(gè)屬性定義:窗口的size及其前進(jìn)間隔advance interval?(也稱為hop)。前進(jìn)間隔指定一個(gè)窗口相對(duì)于前一個(gè)窗口向前移動(dòng)多少。例如,您可以配置一個(gè)size為5分鐘、advance為1分鐘的跳轉(zhuǎn)窗口。由于跳躍窗口可以重疊(通常情況下確實(shí)如此),數(shù)據(jù)記錄可能屬于多個(gè)這樣的窗口。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private static final long TIME_WINDOW_SECONDS = 5L; //窗口大小設(shè)為5秒 private static final long ADVANCED_BY_SECONDS = 1L; //前進(jìn)間隔1秒 @Test public void testHoppingTimeWindowWithSuppress() throws InterruptedException {Properties props = configStreamProperties();StreamsBuilder builder = new StreamsBuilder();KStream<String, String> data = builder.stream(TEST_TOPIC);Instant initTime = Instant.now();data.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(TIME_WINDOW_SECONDS)).advanceBy(Duration.ofSeconds(ADVANCED_BY_SECONDS)).grace(Duration.ZERO)).count(Materialized.with(Serdes.String(), Serdes.Long())).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().filterNot(((key, value) -> this.isOldWindow(key, value, initTime))) //剔除太舊的時(shí)間窗口.foreach(this::dealWithTimeWindowAggrValue);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, props);streams.start();Thread.currentThread().join(); }

注意到上面的代碼中,我們還用到了grace(Duration.ZERO)和suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))

后者的意思是:抑制住上游流的輸出,直到當(dāng)前時(shí)間窗口關(guān)閉后,才向下游發(fā)送數(shù)據(jù)。前面我們說(shuō)過(guò),每當(dāng)統(tǒng)計(jì)值產(chǎn)生變化時(shí),統(tǒng)計(jì)的結(jié)果會(huì)立即發(fā)送給下游。但是有些情況下,比如我們從kafka中的消息記錄了應(yīng)用程序的每次gc時(shí)間,我們的流任務(wù)需要統(tǒng)計(jì)每個(gè)時(shí)間窗口內(nèi)的平均gc時(shí)間,然后發(fā)送給下游(下游可能是直接輸出到控制臺(tái),也可能是另一個(gè)kafka topic或者一段報(bào)警邏輯)。那么,只要當(dāng)這個(gè)時(shí)間窗口關(guān)閉時(shí),向下游發(fā)送一個(gè)最終結(jié)果就夠了。而且有的情況下,如果窗口還沒(méi)關(guān)閉就發(fā)送到下游,可能導(dǎo)致錯(cuò)誤的邏輯(比如數(shù)據(jù)抖動(dòng)產(chǎn)生誤報(bào)警)。

grace的意思是,設(shè)立一個(gè)數(shù)據(jù)晚到的期限,這個(gè)期限過(guò)了之后時(shí)間窗口才關(guān)閉。比如窗口大小為5,當(dāng)15:20的時(shí)候,15:15-15:20的窗口應(yīng)當(dāng)關(guān)閉了,但是為了防止網(wǎng)絡(luò)延時(shí)導(dǎo)致數(shù)據(jù)晚到,比如15點(diǎn)22分的時(shí)候,有可能才接收時(shí)間戳是15點(diǎn)20分的數(shù)據(jù)。所以我們可以把這個(gè)晚到時(shí)間設(shè)為2分鐘,那么知道15點(diǎn)22的時(shí)候,15:15-15:20的窗口才關(guān)閉。

注意一個(gè)坑:**如果使用Suppressed.untilWindowCloses,那么窗口必須要指定grace。因?yàn)槟J(rèn)的grace時(shí)間是24小時(shí)。所以24小時(shí)之內(nèi)窗口是一直不關(guān)閉的,而且由于被suppress住了,所以下游會(huì)一直收不到結(jié)果。**另外也可以使用Suppressed.untilTimeLimit來(lái)指定上游聚合計(jì)算的值在多久后發(fā)往下游,它與窗口是否關(guān)閉無(wú)關(guān),所以可以不使用grace

上面的代碼中,為了方便,我們令grace為0,也就是當(dāng)窗口的截止時(shí)間到了后立即關(guān)閉窗口。

另外我們還使用suppress,抑制住中間的計(jì)算結(jié)果。所以可以看到,每個(gè)窗口關(guān)閉后,向下游(這里就是控制臺(tái))發(fā)送了一個(gè)最終結(jié)果“5”。

為了驗(yàn)證,我們?nèi)サ魋uppress方法試一下。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Test public void testHoppingTimeWindow() throws InterruptedException {Properties props = configStreamProperties();StreamsBuilder builder = new StreamsBuilder();KStream<String, String> data = builder.stream(TEST_TOPIC);Instant initTime = Instant.now();data.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(TIME_WINDOW_SECONDS)).advanceBy(Duration.ofSeconds(ADVANCED_BY_SECONDS)).grace(Duration.ZERO)).count(Materialized.with(Serdes.String(), Serdes.Long())).toStream().filterNot(((key, value) -> this.isOldWindow(key, value, initTime))).foreach(this::dealWithTimeWindowAggrValue);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, props);streams.start();Thread.currentThread().join(); }

運(yùn)行結(jié)果如下:

如果不仔細(xì)觀察,可能會(huì)覺(jué)得結(jié)果很亂。所以我用方框做了區(qū)分:

51秒時(shí)第一個(gè)消息到達(dá),使得所在的5個(gè)窗口都進(jìn)行聚合計(jì)算count后,結(jié)果立即發(fā)往下游,所以是1,1,1,1,1。

52秒時(shí)第二個(gè)消息到達(dá),所在的5個(gè)窗口都進(jìn)行count累加計(jì)算后,結(jié)果立即發(fā)往下游,所以是2,2,2,2,1。注意到,最后的“1”是新的窗口(51秒-56秒窗口)的累加計(jì)算,所以值為1。而“46秒-51秒”這個(gè)窗口由于已經(jīng)關(guān)閉,就不會(huì)再進(jìn)行累加計(jì)算,從而不會(huì)有新的結(jié)果發(fā)送給下游輸出。

53秒第三個(gè)消息到達(dá),之前的2,2,2,2,1的第一個(gè)“2”所在窗口關(guān)閉了,然后剩下的三個(gè)分別加1,變成了3,3,3,2。另外還有一個(gè)新的時(shí)間窗口打開。所以最后得到3,3,3,2,1。

時(shí)間窗口上聚合計(jì)算的坑

上面我特意強(qiáng)調(diào)了兩點(diǎn),一是所在的窗口進(jìn)行聚合計(jì)算,二是聚合計(jì)算的結(jié)果立即發(fā)往下游。第二點(diǎn)我們已經(jīng)驗(yàn)證了。我們將最開始Tumbling time window的程序加上suppres進(jìn)一步驗(yàn)證一下。

聚合計(jì)算結(jié)果何時(shí)到達(dá)下游

之前的代碼會(huì)輸出123451234512345…,而且每個(gè)12345都是同一個(gè)窗口輸出的??梢娋酆辖Y(jié)果計(jì)算后,默認(rèn)會(huì)立即發(fā)給下游。

改變代碼如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Test public void testTumblingTimeWindowWithSuppress() throws InterruptedException {Properties props = configStreamProperties();StreamsBuilder builder = new StreamsBuilder();KStream<String, String> data = builder.stream(TEST_TOPIC);Instant initTime = Instant.now();data.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(TIME_WINDOW_SECONDS)).grace(Duration.ZERO)).count(Materialized.with(Serdes.String(), Serdes.Long())).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().filterNot(((key, value) -> this.isOldWindow(key, value, initTime))) .foreach(this::dealWithTimeWindowAggrValue);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, props);streams.start();Thread.currentThread().join(); }

可以看到,只有當(dāng)窗口關(guān)閉后,窗口的聚合結(jié)果才會(huì)發(fā)送到下游。所以最終下游只得到了555555…

何時(shí)進(jìn)行聚合計(jì)算

我們?cè)賮?lái)看下第一點(diǎn):當(dāng)新的數(shù)據(jù)到來(lái)時(shí),所在的時(shí)間窗口都會(huì)進(jìn)行聚合計(jì)算。

有的人可能會(huì)誤解,如果使用了Suppressed.untilWindowCloses,是不是只用在窗口關(guān)閉時(shí)進(jìn)行一次求和計(jì)算就好了。其實(shí)不是這樣的,只要一個(gè)數(shù)據(jù)落到了某個(gè)窗口內(nèi)(同一數(shù)據(jù)可以落到多個(gè)窗口),窗口便會(huì)立即進(jìn)行聚合計(jì)算。

我們繼續(xù)使用testTumblingTimeWindowWithSuppress()的例子,改動(dòng)如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Test public void testTumblingTimeWindowWithSuppress() throws InterruptedException {Properties props = configStreamProperties();StreamsBuilder builder = new StreamsBuilder();KStream<String, String> data = builder.stream(TEST_TOPIC);Instant initTime = Instant.now();data.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(TIME_WINDOW_SECONDS)).grace(Duration.ZERO)).aggregate(() -> 0L, this::aggrDataInTimeWindow, Materialized.with(Serdes.String(), Serdes.Long())) // 使用自定義aggregator.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().filterNot(((key, value) -> this.isOldWindow(key, value, initTime))) .foreach(this::dealWithTimeWindowAggrValue);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, props);streams.start();Thread.currentThread().join(); }private Long aggrDataInTimeWindow(String key, String value, Long curValue) {curValue++;System.out.println("聚合計(jì)算:key=" + key + ",value=" + value + "\nafter aggr, curValue=" + curValue);return curValue; }

之前我們使用count()方法,現(xiàn)在我們使用aggregate()方法來(lái)達(dá)到count的同樣功能,另外打印一行日志,這樣我們就可以知道何時(shí)進(jìn)行的聚合計(jì)算。

PS:aggregate方法接收三個(gè)參數(shù),第一個(gè)指明聚合計(jì)算的初始值,第二個(gè)指明如何將流中當(dāng)前元素累加到歷史的聚合值上,第三個(gè)指明聚合計(jì)算后key和value的數(shù)據(jù)類型:

1 2 3 <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,final Aggregator<? super K, ? super V, VR> aggregator,final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);

運(yùn)行后:

可以看到,雖然被suppress了,但是聚合函數(shù)會(huì)在每次數(shù)據(jù)到來(lái)時(shí)被調(diào)用。

進(jìn)一步地,我們?cè)谑褂胔opping time windows 進(jìn)行驗(yàn)證:到達(dá)的數(shù)據(jù)落到的每個(gè)窗口上,都會(huì)立即、分別調(diào)用該窗口的聚合函數(shù)。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Test public void testHoppingTimeWindowWithSuppress() throws InterruptedException {Properties props = configStreamProperties();StreamsBuilder builder = new StreamsBuilder();KStream<String, String> data = builder.stream(TEST_TOPIC);Instant initTime = Instant.now();data.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(TIME_WINDOW_SECONDS)).advanceBy(Duration.ofSeconds(ADVANCED_BY_SECONDS)).grace(Duration.ZERO)).aggregate(() -> 0L, this::aggrDataInTimeWindow, Materialized.with(Serdes.String(), Serdes.Long())) // 使用自定義aggregator.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().filterNot(((key, value) -> this.isOldWindow(key, value, initTime))) .foreach(this::dealWithTimeWindowAggrValue);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, props);streams.start();Thread.currentThread().join(); }

結(jié)果如下:

可以看到,由于我們?cè)O(shè)置的時(shí)間窗口size=5s,前進(jìn)間隔hop=1s,所以每個(gè)數(shù)據(jù)可以同時(shí)落到5個(gè)窗口內(nèi)(見圖)。

小結(jié)

明白了事件窗口的聚合計(jì)算邏輯,我們?cè)诰幊淌蔷涂梢员苊庖恍╁e(cuò)誤。比如自定義聚合函數(shù)時(shí),Aggregator內(nèi)應(yīng)當(dāng)只負(fù)責(zé)聚合計(jì)算,不應(yīng)把其他的邏輯(比如將計(jì)算結(jié)果保存到db)寫到Aggreagator里面。如果這樣做了,一旦修改了時(shí)間窗口的配置,修改了時(shí)間窗口類型、grace、suppress等,會(huì)導(dǎo)致混亂的結(jié)果。Aggreagator應(yīng)當(dāng)只封裝聚合算法,而其他的邏輯如filter、map等應(yīng)當(dāng)單獨(dú)封裝。

Time

最后我們研究下Kafka Stream中的時(shí)間概念。

上面我們利用時(shí)間窗口進(jìn)行了實(shí)時(shí)計(jì)算,用起來(lái)很方便。但是你有沒(méi)有想過(guò),當(dāng)我們的流任務(wù)收到一條消息時(shí),是如何定義這條消息的時(shí)間戳呢?

這個(gè)問(wèn)題其實(shí)不光是Kafka Stream的問(wèn)題,也牽扯到Kafka基本生產(chǎn)者消費(fèi)者模型。但是由于實(shí)時(shí)計(jì)算的特點(diǎn),在Kafka Stream中需要格外關(guān)注。

Kafka有這樣幾個(gè)時(shí)間概念:?http://kafka.apache.org/23/documentation/streams/core-concepts#streams_time

  • Event time - 事件時(shí)間:事件真正發(fā)生的時(shí)間點(diǎn),比如一個(gè)GPS設(shè)備在某刻捕獲到了位置變化,產(chǎn)生了一個(gè)記錄,這就是事件時(shí)間。(也就是說(shuō),事件時(shí)間與Kafka無(wú)關(guān))
  • Processing time - 處理時(shí)間:KafkaStream應(yīng)用處理數(shù)據(jù)的時(shí)間點(diǎn),即消息被應(yīng)用消費(fèi)時(shí)的時(shí)間點(diǎn)。此時(shí)間點(diǎn)比EventTime晚,有可能是毫秒、小時(shí)甚至幾天。
  • Ingestion time - 攝入時(shí)間:消息被存入到Kafka的時(shí)間點(diǎn)。(準(zhǔn)確地說(shuō)是存入到Topic分區(qū)的時(shí)間點(diǎn))。

攝入時(shí)間與事件時(shí)間的區(qū)別:前者是消息存入到topic的時(shí)間,后者是事件發(fā)生的事件。?攝入時(shí)間與處理時(shí)間的去表:后者是被KafkaStream應(yīng)用消費(fèi)到的時(shí)間點(diǎn)。如果一個(gè)記錄從未被消費(fèi),則它擁有攝入時(shí)間而沒(méi)有處理時(shí)間。

The choice between event-time and ingestion-time is actually done through the configuration of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded into Kafka messages. Depending on Kafka’s configuration these timestamps represent event-time or ingestion-time. The respective Kafka configuration setting can be specified on the broker level or per topic.?The default timestamp extractor in Kafka Streams will retrieve these embedded timestamps as-is.?Hence, the effective time semantics of your application depend on the effective Kafka configuration for these embedded timestamps.

指定事件時(shí)間

應(yīng)用可以自行將事件時(shí)間信息保存到消息內(nèi)容里,然后將消息發(fā)送到kafka。在KafkaStream應(yīng)用中,繼承TimeStampExtractor,在重載的extract方法中定義如何從消息中抽取時(shí)間時(shí)間。并在構(gòu)造KafkaStream的props里配置上該自定義的時(shí)間提取器。

比如我們自定義一個(gè)TimeStampExtractor,它可以從消息體中抽取我們?cè)诎l(fā)送時(shí)寫入的時(shí)間信息。

1 2 3 4 5 6 7 8 9 10 11 12 public class MyTimestampExtractor implements TimestampExtractor {@Overridepublic long extract(ConsumerRecord<Object, Object> record, long timeMill) {String value = record.value().toString();String eventTimeStr = value.split("@")[1]; //發(fā)送消息時(shí) value = key + "@" + timeStrLocalDateTime eventTime = LocalDateTime.parse(eventTimeStr);Instant instant = eventTime.toInstant(ZoneOffset.ofHours(8));return instant.toEpochMilli();}}

我們?cè)诎l(fā)送消息的時(shí)候,將時(shí)間信息放到消息內(nèi)容里,但是我們做個(gè)小把戲,將時(shí)間對(duì)齊到每分鐘的0秒。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @BeforeClass public static void generateValue() {Properties props = new Properties();props.put("bootstrap.servers", BOOT_STRAP_SERVERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("request.required.acks", "0");new Thread(() -> {Producer<String, String> producer = new KafkaProducer<>(props);try {while (true) {TimeUnit.SECONDS.sleep(1L);Instant now = Instant.now();String key = "service_1";// 將時(shí)間信息放到消息內(nèi)容里,但是我們做個(gè)小把戲,將時(shí)間對(duì)齊到每分鐘的0秒String value = key + "@" + alignToMinute(now);producer.send(new ProducerRecord<>(TEST_TOPIC, key, value));}} catch (Exception e) {e.printStackTrace();producer.close();}}).start(); }

然后需要指定使用我們自定義的時(shí)間提取器。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private static final long TIME_WINDOW_SECONDS = 5L; @Test public void testEventTime() throws InterruptedException {Properties props = configStreamProperties();// 指定使用自定義的時(shí)間提取器props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class);StreamsBuilder builder = new StreamsBuilder();KStream<String, String> data = builder.stream(TEST_TOPIC);Instant initTime = Instant.now();data.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(TIME_WINDOW_SECONDS))) // 使用翻滾窗口.count(Materialized.with(Serdes.String(), Serdes.Long())).toStream().filterNot(((key, value) -> this.isOldWindow(key, value, initTime))).foreach(this::dealWithTimeWindowAggrValue);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, props);streams.start();Thread.currentThread().join(); }

我們的窗口大小仍然是5秒,使用翻滾窗口,聚合計(jì)算的值立即輸出到下游(控制臺(tái))。

還記的在Tumbling time windows小節(jié)里的例子嗎,當(dāng)時(shí)的輸出是123451234512345…。但是我們現(xiàn)在使用自定義時(shí)間提取器,從消息內(nèi)容里提取時(shí)間信息,而在發(fā)送時(shí)做了點(diǎn)小把戲,所以在同一分鐘內(nèi)接收到的消息,提出來(lái)的時(shí)間都是0秒的,也就是都會(huì)落到第一個(gè)時(shí)間窗口內(nèi)(0秒-5秒窗口)。

使用內(nèi)嵌的時(shí)間戳

如果不制定自定義的時(shí)間提取器,時(shí)間又是哪里來(lái)的呢? kafka每條消息中其實(shí)自帶了時(shí)間戳,作為CreateTime?我們?cè)诎l(fā)送消息時(shí),一般時(shí)這樣寫的

1 producer.send(new ProducerRecord<>(TOPIC, key, value)

看線ProducerRecord的這個(gè)構(gòu)造方法

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null); }/*** Creates a record with a specified timestamp to be sent to a specified topic and partition* * @param topic The topic the record will be appended to* @param partition The partition to which the record should be sent* @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign* the timestamp using System.currentTimeMillis().* @param key The key that will be included in the record* @param value The record contents* @param headers the headers that will be included in the record*/ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {//... }

我們注意到第三個(gè)參數(shù),如果傳入的是null,則會(huì)使用System.currentTimeMillis()

KafkaStream在不配置自定義TimeStampExtractor時(shí),會(huì)使用這個(gè)消息中內(nèi)嵌的時(shí)間戳,而這個(gè)時(shí)間戳可能是Producer程序中ProducerRecord生成的時(shí)候的時(shí)刻,也可能是消息寫入到topic的log文件中的時(shí)刻。

相關(guān)配置:message.timestamp.type。

namedesctypedefaultVALID VALUES
message.timestamp.typeDefine whether the timestamp in the message is message create time or log append timestringCreateTime[CreateTime, LogAppendTime]

該配置在broker和topic維度上可分別配置。

我們?cè)龠M(jìn)行實(shí)驗(yàn),這次不配置自定義的TimestampExtractor了。這時(shí)默認(rèn)的TimeStampExtractor會(huì)使用消息中內(nèi)嵌的時(shí)間戳。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Test public void testEventTime() throws InterruptedException {Properties props = configStreamProperties();// 指定使用自定義的時(shí)間提取器// props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class);StreamsBuilder builder = new StreamsBuilder();KStream<String, String> data = builder.stream(TEST_TOPIC);Instant initTime = Instant.now();data.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(TIME_WINDOW_SECONDS))).count(Materialized.with(Serdes.String(), Serdes.Long())).toStream().filterNot(((key, value) -> this.isOldWindow(key, value, initTime))).foreach(this::dealWithTimeWindowAggrValue);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, props);streams.start();Thread.currentThread().join(); }

在發(fā)送的時(shí)候,傳入內(nèi)嵌的時(shí)間戳的值,但是我們做個(gè)小把戲,將時(shí)間對(duì)齊到每分鐘的30秒。這時(shí)默認(rèn)的TimeStampExtractor從內(nèi)嵌的時(shí)間戳提取出來(lái)后,會(huì)發(fā)現(xiàn)他們都落在“30秒-35秒”這個(gè)窗口內(nèi)。

上面講的是流任務(wù)面對(duì)收到的消息時(shí),如何獲取時(shí)間信息。

而當(dāng)流任務(wù)如果要將處理過(guò)的內(nèi)容打回Kafka時(shí),是如何添加時(shí)間信息的呢?

Whenever a Kafka Streams application writes records to Kafka, then it will also assign timestamps to these new records. The way the timestamps are assigned depends on the context:

  • When new output records are generated via processing some input record, for example,?context.forward()triggered in the?process()function call, output record timestamps are inherited from input record timestamps directly.
  • When new output records are generated via periodic functions such as?Punctuator#punctuate(), the output record timestamp is defined as the current internal time (obtained through?context.timestamp()) of the stream task.
  • For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update.

Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling?#forward().

總結(jié)

  • Kafka Stream中有4種時(shí)間窗口:Tumbling time window、Hopping time window、sliding time window、session time window
  • 可以使用supress方法不讓每次新的數(shù)據(jù)落到窗口內(nèi)時(shí),都立即向下游發(fā)送新的統(tǒng)計(jì)值。
  • 如果使用Suppressed.untilWindowCloses,那么窗口必須要指定grace。因?yàn)槟J(rèn)的grace時(shí)間是24小時(shí)。所以24小時(shí)之內(nèi)窗口是一直不關(guān)閉的,而且由于被suppress住了,所以下游會(huì)一直收不到結(jié)果。
  • 可以使用Suppressed.untilTimeLimit來(lái)指定上游聚合計(jì)算的值在多久后發(fā)往下游,它與時(shí)間窗口是否關(guān)閉無(wú)關(guān),所以可以不使用grace。
  • 到達(dá)的數(shù)據(jù)落到的每個(gè)窗口上,都會(huì)立即、分別調(diào)用該窗口的聚合函數(shù),計(jì)算結(jié)果默認(rèn)情況下立即發(fā)送到下游,除非使用了suppress()。
  • Aggregator內(nèi)應(yīng)當(dāng)只負(fù)責(zé)聚合計(jì)算,不應(yīng)把其他的邏輯(比如將計(jì)算結(jié)果保存到db)寫到Aggreagator里面。如果這樣做了,一旦修改了時(shí)間窗口的配置,修改了時(shí)間窗口類型、grace、suppress等,會(huì)導(dǎo)致混亂的結(jié)果。
  • KafkaStream的默認(rèn)TimeStampExtractor,會(huì)提取消息中內(nèi)嵌的時(shí)間戳,供依賴于時(shí)間的操作(如windowBy)使用。這個(gè)時(shí)間戳可能是Producer程序中ProducerRecord生成的時(shí)刻,也可能是消息寫入到topic的log文件中的時(shí)刻,取決于message.timestamp.type配置。
  • 如果要使用事件時(shí)間,發(fā)送消息時(shí)可將事件時(shí)間信息保存到消息內(nèi)容里,然后將消息發(fā)送到kafka。在KafkaStream應(yīng)用中,繼承TimeStampExtractor,在重載的extract方法中定義如何從消息中抽取時(shí)間時(shí)間。并在構(gòu)造KafkaStream的props里配置上該自定義的時(shí)間提取器。

參考文檔

Kafka Stream 官方文檔

總結(jié)

以上是生活随笔為你收集整理的KafkaStream之时间窗口WindowBy的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。