日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

流式计算新贵Kafka Stream设计详解--转

發(fā)布時(shí)間:2025/4/5 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 流式计算新贵Kafka Stream设计详解--转 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

原文地址:https://mp.weixin.qq.com/s?__biz=MzA5NzkxMzg1Nw==&mid=2653162822&idx=1&sn=8c46114360b98b621b166d41d8e01d74&chksm=8b493028bc3eb93e8376d85c7d1f9b2a699888b7f0f52e4556bb8543ebebd5e102e91ea23355#rd

本文介紹了 Kafka Stream 的背景,如 Kafka Stream 是什么,什么是流式計(jì)算,以及為什么要有 Kafka Stream。接著介紹了 Kafka Stream 的整體架構(gòu)、并行模型、狀態(tài)存儲(chǔ)以及主要的兩種數(shù)據(jù)集 KStream 和 KTable。然后分析了 Kafka Stream 如何解決流式系統(tǒng)中的關(guān)鍵問(wèn)題,如時(shí)間定義、窗口操作、Join 操作、聚合操作,以及如何處理亂序和提供容錯(cuò)能力。最后結(jié)合示例講解了如何使用 Kafka Stream。Kafka Stream 背景??Kafka Stream 是什么??

Kafka Stream 是 Apache Kafka 從 0.10 版本引入的一個(gè)新 Feature。它提供了對(duì)存儲(chǔ)于 Kafka 內(nèi)的數(shù)據(jù)進(jìn)行流式處理和分析的功能。

Kafka Stream 的特點(diǎn)如下:

  • Kafka Stream 提供了一個(gè)非常簡(jiǎn)單而輕量的 Library,它可以非常方便地嵌入任意 Java 應(yīng)用中,也可以任意方式打包和部署

  • 除了 Kafka 外,無(wú)任何外部依賴

  • 充分利用 Kafka 分區(qū)機(jī)制實(shí)現(xiàn)水平擴(kuò)展和順序性保證

  • 通過(guò)可容錯(cuò)的 state store 實(shí)現(xiàn)高效的狀態(tài)操作(如 windowed join 和 aggregation)

  • 支持正好一次處理語(yǔ)義

  • 提供記錄級(jí)的處理能力,從而實(shí)現(xiàn)毫秒級(jí)的低延遲

  • 支持基于事件時(shí)間的窗口操作,并且可處理晚到的數(shù)據(jù)(late arrival of records)

  • 同時(shí)提供底層的處理原語(yǔ) Processor(類似于 Storm 的 spout 和 bolt),以及高層抽象的 DSL(類似于 Spark 的 map/group/reduce)

什么是流式計(jì)算??

一般流式計(jì)算會(huì)與批量計(jì)算相比較。在流式計(jì)算模型中,輸入是持續(xù)的,可以認(rèn)為在時(shí)間上是無(wú)界的,也就意味著,永遠(yuǎn)拿不到全量數(shù)據(jù)去做計(jì)算。同時(shí),計(jì)算結(jié)果是持續(xù)輸出的,也即計(jì)算結(jié)果在時(shí)間上也是無(wú)界的。流式計(jì)算一般對(duì)實(shí)時(shí)性要求較高,同時(shí)一般是先定義目標(biāo)計(jì)算,然后數(shù)據(jù)到來(lái)之后將計(jì)算邏輯應(yīng)用于數(shù)據(jù)。同時(shí)為了提高計(jì)算效率,往往盡可能采用增量計(jì)算代替全量計(jì)算。

批量處理模型中,一般先有全量數(shù)據(jù)集,然后定義計(jì)算邏輯,并將計(jì)算應(yīng)用于全量數(shù)據(jù)。特點(diǎn)是全量計(jì)算,并且計(jì)算結(jié)果一次性全量輸出。

?

為什么要有 Kafka Stream??

當(dāng)前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開源流式處理系統(tǒng)有 Spark Streaming 和 Apache Storm。Apache Storm 發(fā)展多年,應(yīng)用廣泛,提供記錄級(jí)別的處理能力,當(dāng)前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,可以非常方便與圖計(jì)算,SQL 處理等集成,功能強(qiáng)大,對(duì)于熟悉其它 Spark 應(yīng)用開發(fā)的用戶而言使用門檻低。另外,目前主流的 Hadoop 發(fā)行版,如 MapR,Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache Spark,使得部署更容易。

既然 Apache Spark 與 Apache Storm 擁用如此多的優(yōu)勢(shì),那為何還需要 Kafka Stream 呢?筆者認(rèn)為主要有如下原因。

第一,Spark 和 Storm 都是流式處理框架,而 Kafka Stream 提供的是一個(gè)基于 Kafka 的流式處理類庫(kù)。框架要求開發(fā)者按照特定的方式去開發(fā)邏輯部分,供框架調(diào)用。開發(fā)者很難了解框架的具體運(yùn)行方式,從而使得調(diào)試成本高,并且使用受限。而 Kafka Stream 作為流式處理類庫(kù),直接提供具體的類給開發(fā)者調(diào)用,整個(gè)應(yīng)用的運(yùn)行方式主要由開發(fā)者控制,方便使用和調(diào)試。

第二,雖然 Cloudera 與 Hortonworks 方便了 Storm 和 Spark 的部署,但是這些框架的部署仍然相對(duì)復(fù)雜。而 Kafka Stream 作為類庫(kù),可以非常方便的嵌入應(yīng)用程序中,它對(duì)應(yīng)用的打包和部署基本沒有任何要求。更為重要的是,Kafka Stream 充分利用了 Kafka 的分區(qū)機(jī)制和 Consumer 的 Rebalance 機(jī)制,使得 Kafka Stream 可以非常方便的水平擴(kuò)展,并且各個(gè)實(shí)例可以使用不同的部署方式。

具體來(lái)說(shuō),每個(gè)運(yùn)行 Kafka Stream 的應(yīng)用程序?qū)嵗及?Kafka Consumer 實(shí)例,多個(gè)同一應(yīng)用的實(shí)例之間并行處理數(shù)據(jù)集。而不同實(shí)例之間的部署方式并不要求一致,比如部分實(shí)例可以運(yùn)行在 Web 容器中,部分實(shí)例可運(yùn)行在 Docker 或 Kubernetes 中。

第三,就流式處理系統(tǒng)而言,基本都支持 Kafka 作為數(shù)據(jù)源。例如 Storm 具有專門的 kafka-spout,而 Spark 也提供專門的 spark-streaming-kafka 模塊。事實(shí)上,Kafka 基本上是主流的流式處理系統(tǒng)的標(biāo)準(zhǔn)數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了 Kafka,此時(shí)使用 Kafka Stream 的成本非常低。

第四,使用 Storm 或 Spark Streaming 時(shí),需要為框架本身的進(jìn)程預(yù)留資源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即使對(duì)于應(yīng)用實(shí)例而言,框架本身也會(huì)占用部分資源,如 Spark Streaming 需要為 shuffle 和 storage 預(yù)留內(nèi)存。

第五,由于 Kafka 本身提供數(shù)據(jù)持久化,因此 Kafka Stream 提供滾動(dòng)部署和滾動(dòng)升級(jí)以及重新計(jì)算的能力。

第六,由于 Kafka Consumer Rebalance 機(jī)制,Kafka Stream 可以在線動(dòng)態(tài)調(diào)整并行度。

Kafka Stream 架構(gòu)??

?

Kafka Stream 整體架構(gòu)??

Kafka Stream 的整體架構(gòu)圖如下。

?

目前(Kafka 0.11.0.0)Kafka Stream 的數(shù)據(jù)源只能如上圖所示是 Kafka。但是處理結(jié)果并不一定要如上圖所示輸出到 Kafka。實(shí)際上 KStream 和 Ktable 的實(shí)例化都需要指定 Topic。

KStream<String, String> stream = builder.stream("words-stream");KTable<String, String> table = builder.table("words-table", "words-store");

另外,上圖中的 Consumer 和 Producer 并不需要開發(fā)者在應(yīng)用中顯示實(shí)例化,而是由 Kafka Stream 根據(jù)參數(shù)隱式實(shí)例化和管理,從而降低了使用門檻。開發(fā)者只需要專注于開發(fā)核心業(yè)務(wù)邏輯,也即上圖中 Task 內(nèi)的部分。

Processor Topology??

基于 Kafka Stream 的流式應(yīng)用的業(yè)務(wù)邏輯全部通過(guò)一個(gè)被稱為 Processor Topology 的地方執(zhí)行。它與 Storm 的 Topology 和 Spark 的 DAG 類似,都定義了數(shù)據(jù)在各個(gè)處理單元(在 Kafka Stream 中被稱作 Processor)間的流動(dòng)方式,或者說(shuō)定義了數(shù)據(jù)的處理邏輯。

下面是一個(gè) Processor 的示例,它實(shí)現(xiàn)了 Word Count 功能,并且每秒輸出一次結(jié)果。

public class WordCountProcessor implements Processor<String, String> {private ProcessorContext context;private KeyValueStore<String, Integer> kvStore;@SuppressWarnings("unchecked")@Overridepublic void init(ProcessorContext context) {this.context = context;this.context.schedule(1000);this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");}@Overridepublic void process(String key, String value) {Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));int count = counts.map(wordcount -> wordcount + 1).orElse(1);kvStore.put(word, count);});}@Overridepublic void punctuate(long timestamp) {KeyValueIterator<String, Integer> iterator = this.kvStore.all();iterator.forEachRemaining(entry -> {context.forward(entry.key, entry.value);this.kvStore.delete(entry.key);});context.commit();}@Overridepublic void close() {this.kvStore.close();}}

從上述代碼中可見

  • process 定義了對(duì)每條記錄的處理邏輯,也印證了 Kafka 可具有記錄級(jí)的數(shù)據(jù)處理能力。

  • context.scheduler 定義了 punctuate 被執(zhí)行的周期,從而提供了實(shí)現(xiàn)窗口操作的能力。

  • context.getStateStore 提供的狀態(tài)存儲(chǔ)為有狀態(tài)計(jì)算(如窗口,聚合)提供了可能。

Kafka Stream 并行模型??

Kafka Stream 的并行模型中,最小粒度為 Task,而每個(gè) Task 包含一個(gè)特定子 Topology 的所有 Processor。因此每個(gè) Task 所執(zhí)行的代碼完全一樣,唯一的不同在于所處理的數(shù)據(jù)集互補(bǔ)。這一點(diǎn)跟 Storm 的 Topology 完全不一樣。Storm 的 Topology 的每一個(gè) Task 只包含一個(gè) Spout 或 Bolt 的實(shí)例。因此 Storm 的一個(gè) Topology 內(nèi)的不同 Task 之間需要通過(guò)網(wǎng)絡(luò)通信傳遞數(shù)據(jù),而 Kafka Stream 的 Task 包含了完整的子 Topology,所以 Task 之間不需要傳遞數(shù)據(jù),也就不需要網(wǎng)絡(luò)通信。這一點(diǎn)降低了系統(tǒng)復(fù)雜度,也提高了處理效率。

如果某個(gè) Stream 的輸入 Topic 有多個(gè) (比如 2 個(gè) Topic,1 個(gè) Partition 數(shù)為 4,另一個(gè) Partition 數(shù)為 3),則總的 Task 數(shù)等于 Partition 數(shù)最多的那個(gè) Topic 的 Partition 數(shù)(max(4,3)=4)。這是因?yàn)?Kafka Stream 使用了 Consumer 的 Rebalance 機(jī)制,每個(gè) Partition 對(duì)應(yīng)一個(gè) Task。

下圖展示了在一個(gè)進(jìn)程(Instance)中以 2 個(gè) Topic(Partition 數(shù)均為 4)為數(shù)據(jù)源的 Kafka Stream 應(yīng)用的并行模型。從圖中可以看到,由于 Kafka Stream 應(yīng)用的默認(rèn)線程數(shù)為 1,所以 4 個(gè) Task 全部在一個(gè)線程中運(yùn)行。

為了充分利用多線程的優(yōu)勢(shì),可以設(shè)置 Kafka Stream 的線程數(shù)。下圖展示了線程數(shù)為 2 時(shí)的并行模型。

前文有提到,Kafka Stream 可被嵌入任意 Java 應(yīng)用(理論上基于 JVM 的應(yīng)用都可以)中,下圖展示了在同一臺(tái)機(jī)器的不同進(jìn)程中同時(shí)啟動(dòng)同一 Kafka Stream 應(yīng)用時(shí)的并行模型。注意,這里要保證兩個(gè)進(jìn)程的 StreamsConfig.APPLICATION_ID_CONFIG 完全一樣。因?yàn)?Kafka Stream 將 APPLICATION_ID_CONFI 作為隱式啟動(dòng)的 Consumer 的 Group ID。只有保證 APPLICATION_ID_CONFI 相同,才能保證這兩個(gè)進(jìn)程的 Consumer 屬于同一個(gè) Group,從而可以通過(guò) Consumer Rebalance 機(jī)制拿到互補(bǔ)的數(shù)據(jù)集。

既然實(shí)現(xiàn)了多進(jìn)程部署,可以以同樣的方式實(shí)現(xiàn)多機(jī)器部署。該部署方式也要求所有進(jìn)程的 APPLICATION_ID_CONFIG 完全一樣。從圖上也可以看到,每個(gè)實(shí)例中的線程數(shù)并不要求一樣。但是無(wú)論如何部署,Task 總數(shù)總會(huì)保證一致。

注意:Kafka Stream 的并行模型,非常依賴于《Kafka 設(shè)計(jì)解析(一)- Kafka 背景及架構(gòu)介紹》一文中介紹的 Kafka 分區(qū)機(jī)制和《Kafka 設(shè)計(jì)解析(四)- Kafka Consumer 設(shè)計(jì)解析》中介紹的 Consumer 的 Rebalance 機(jī)制。強(qiáng)烈建議不太熟悉這兩種機(jī)制的朋友,先行閱讀這兩篇文章。

這里對(duì)比一下 Kafka Stream 的 Processor Topology 與 Storm 的 Topology。

  • Storm 的 Topology 由 Spout 和 Bolt 組成,Spout 提供數(shù)據(jù)源,而 Bolt 提供計(jì)算和數(shù)據(jù)導(dǎo)出。Kafka Stream 的 Processor Topology 完全由 Processor 組成,因?yàn)樗臄?shù)據(jù)固定由 Kafka 的 Topic 提供。

  • Storm 的不同 Bolt 運(yùn)行在不同的 Executor 中,很可能位于不同的機(jī)器,需要通過(guò)網(wǎng)絡(luò)通信傳輸數(shù)據(jù)。而 Kafka Stream 的 Processor Topology 的不同 Processor 完全運(yùn)行于同一個(gè) Task 中,也就完全處于同一個(gè)線程,無(wú)需網(wǎng)絡(luò)通信。

  • Storm 的 Topology 可以同時(shí)包含 Shuffle 部分和非 Shuffle 部分,并且往往一個(gè) Topology 就是一個(gè)完整的應(yīng)用。而 Kafka Stream 的一個(gè)物理 Topology 只包含非 Shuffle 部分,而 Shuffle 部分需要通過(guò) through 操作顯示完成,該操作將一個(gè)大的 Topology 分成了 2 個(gè)子 Topology。

  • Storm 的 Topology 內(nèi),不同 Bolt/Spout 的并行度可以不一樣,而 Kafka Stream 的子 Topology 內(nèi),所有 Processor 的并行度完全一樣。

  • Storm 的一個(gè) Task 只包含一個(gè) Spout 或者 Bolt 的實(shí)例,而 Kafka Stream 的一個(gè) Task 包含了一個(gè)子 Topology 的所有 Processor。

KTable vs. KStream??

KTable 和 KStream 是 Kafka Stream 中非常重要的兩個(gè)概念,它們是 Kafka 實(shí)現(xiàn)各種語(yǔ)義的基礎(chǔ)。因此這里有必要分析下二者的區(qū)別。

KStream 是一個(gè)數(shù)據(jù)流,可以認(rèn)為所有記錄都通過(guò) Insert only 的方式插入進(jìn)這個(gè)數(shù)據(jù)流里。而 KTable 代表一個(gè)完整的數(shù)據(jù)集,可以理解為數(shù)據(jù)庫(kù)中的表。

由于每條記錄都是 Key-Value 對(duì),這里可以將 Key 理解為數(shù)據(jù)庫(kù)中的 Primary Key,而 Value 可以理解為一行記錄。可以認(rèn)為 KTable 中的數(shù)據(jù)都是通過(guò) Update only 的方式進(jìn)入的。也就意味著,如果 KTable 對(duì)應(yīng)的 Topic 中新進(jìn)入的數(shù)據(jù)的 Key 已經(jīng)存在,那么從 KTable 只會(huì)取出同一 Key 對(duì)應(yīng)的最后一條數(shù)據(jù),相當(dāng)于新的數(shù)據(jù)更新了舊的數(shù)據(jù)。

以下圖為例,假設(shè)有一個(gè) KStream 和 KTable,基于同一個(gè) Topic 創(chuàng)建,并且該 Topic 中包含如下圖所示 5 條數(shù)據(jù)。此時(shí)遍歷 KStream 將得到與 Topic 內(nèi)數(shù)據(jù)完全一樣的所有 5 條數(shù)據(jù),且順序不變。而此時(shí)遍歷 KTable 時(shí),因?yàn)檫@ 5 條記錄中有 3 個(gè)不同的 Key,所以將得到 3 條記錄,每個(gè) Key 對(duì)應(yīng)最新的值,并且這三條數(shù)據(jù)之間的順序與原來(lái)在 Topic 中的順序保持一致。這一點(diǎn)與 Kafka 的日志 compact 相同。

此時(shí)如果對(duì)該 KStream 和 KTable 分別基于 key 做 Group,對(duì) Value 進(jìn)行 Sum,得到的結(jié)果將會(huì)不同。對(duì) KStream 的計(jì)算結(jié)果是<jack,4><Jack,4>,<Lily,7>,<Mike,4><lily,7><mike,4>。而對(duì) Ktable 的計(jì)算結(jié)果是<mike,4><Mike,4>,<Jack,3>,<Lily,5><jack,3><lily,5>。

State store??

流式處理中,部分操作是無(wú)狀態(tài)的,例如過(guò)濾操作(Kafka Stream DSL 中用 filer 方法實(shí)現(xiàn))。而部分操作是有狀態(tài)的,需要記錄中間狀態(tài),如 Window 操作和聚合計(jì)算。State store 被用來(lái)存儲(chǔ)中間狀態(tài)。它可以是一個(gè)持久化的 Key-Value 存儲(chǔ),也可以是內(nèi)存中的 HashMap,或者是數(shù)據(jù)庫(kù)。Kafka 提供了基于 Topic 的狀態(tài)存儲(chǔ)。

Topic 中存儲(chǔ)的數(shù)據(jù)記錄本身是 Key-Value 形式的,同時(shí) Kafka 的 log compaction 機(jī)制可對(duì)歷史數(shù)據(jù)做 compact 操作,保留每個(gè) Key 對(duì)應(yīng)的最后一個(gè) Value,從而在保證 Key 不丟失的前提下,減少總數(shù)據(jù)量,從而提高查詢效率。

構(gòu)造 KTable 時(shí),需要指定其 state store name。默認(rèn)情況下,該名字也即用于存儲(chǔ)該 KTable 的狀態(tài)的 Topic 的名字,遍歷 KTable 的過(guò)程,實(shí)際就是遍歷它對(duì)應(yīng)的 state store,或者說(shuō)遍歷 Topic 的所有 key,并取每個(gè) Key 最新值的過(guò)程。為了使得該過(guò)程更加高效,默認(rèn)情況下會(huì)對(duì)該 Topic 進(jìn)行 compact 操作。

另外,除了 KTable,所有狀態(tài)計(jì)算,都需要指定 state store name,從而記錄中間狀態(tài)。

Kafka Stream 如何解決流式系統(tǒng)中關(guān)鍵問(wèn)題??時(shí)間??

在流式數(shù)據(jù)處理中,時(shí)間是數(shù)據(jù)的一個(gè)非常重要的屬性。從 Kafka 0.10 開始,每條記錄除了 Key 和 Value 外,還增加了 timestamp 屬性。目前 Kafka Stream 支持三種時(shí)間

  • 事件發(fā)生時(shí)間。事件發(fā)生的時(shí)間,包含在數(shù)據(jù)記錄中。發(fā)生時(shí)間由 Producer 在構(gòu)造 ProducerRecord 時(shí)指定。并且需要 Broker 或者 Topic 將 message.timestamp.type 設(shè)置為 CreateTime(默認(rèn)值)才能生效。

  • 消息接收時(shí)間,也即消息存入 Broker 的時(shí)間。當(dāng) Broker 或 Topic 將 message.timestamp.type 設(shè)置為 LogAppendTime 時(shí)生效。此時(shí) Broker 會(huì)在接收到消息后,存入磁盤前,將其 timestamp 屬性值設(shè)置為當(dāng)前機(jī)器時(shí)間。一般消息接收時(shí)間比較接近于事件發(fā)生時(shí)間,部分場(chǎng)景下可代替事件發(fā)生時(shí)間。

  • 消息處理時(shí)間,也即 Kafka Stream 處理消息時(shí)的時(shí)間。

注:Kafka Stream 允許通過(guò)實(shí)現(xiàn) org.apache.kafka.streams.processor.TimestampExtractor 接口自定義記錄時(shí)間。

窗口??

前文提到,流式數(shù)據(jù)是在時(shí)間上無(wú)界的數(shù)據(jù)。而聚合操作只能作用在特定的數(shù)據(jù)集,也即有界的數(shù)據(jù)集上。因此需要通過(guò)某種方式從無(wú)界的數(shù)據(jù)集上按特定的語(yǔ)義選取出有界的數(shù)據(jù)。窗口是一種非常常用的設(shè)定計(jì)算邊界的方式。不同的流式處理系統(tǒng)支持的窗口類似,但不盡相同。

Kafka Stream 支持的窗口如下。

1、Hopping Time Window 該窗口定義如下圖所示。它有兩個(gè)屬性,一個(gè)是 Window size,一個(gè)是 Advance interval。Window size 指定了窗口的大小,也即每次計(jì)算的數(shù)據(jù)集的大小。而 Advance interval 定義輸出的時(shí)間間隔。一個(gè)典型的應(yīng)用場(chǎng)景是,每隔 5 秒鐘輸出一次過(guò)去 1 個(gè)小時(shí)內(nèi)網(wǎng)站的 PV 或者 UV。

2、Tumbling Time Window 該窗口定義如下圖所示。可以認(rèn)為它是 Hopping Time Window 的一種特例,也即 Window size 和 Advance interval 相等。它的特點(diǎn)是各個(gè) Window 之間完全不相交。

3、Sliding Window 該窗口只用于 2 個(gè) KStream 進(jìn)行 Join 計(jì)算時(shí)。該窗口的大小定義了 Join 兩側(cè) KStream 的數(shù)據(jù)記錄被認(rèn)為在同一個(gè)窗口的最大時(shí)間差。假設(shè)該窗口的大小為 5 秒,則參與 Join 的 2 個(gè) KStream 中,記錄時(shí)間差小于 5 的記錄被認(rèn)為在同一個(gè)窗口中,可以進(jìn)行 Join 計(jì)算。

4、Session Window 該窗口用于對(duì) Key 做 Group 后的聚合操作中。它需要對(duì) Key 做分組,然后對(duì)組內(nèi)的數(shù)據(jù)根據(jù)業(yè)務(wù)需求定義一個(gè)窗口的起始點(diǎn)和結(jié)束點(diǎn)。一個(gè)典型的案例是,希望通過(guò) Session Window 計(jì)算某個(gè)用戶訪問(wèn)網(wǎng)站的時(shí)間。對(duì)于一個(gè)特定的用戶(用 Key 表示)而言,當(dāng)發(fā)生登錄操作時(shí),該用戶(Key)的窗口即開始,當(dāng)發(fā)生退出操作或者超時(shí)時(shí),該用戶(Key)的窗口即結(jié)束。窗口結(jié)束時(shí),可計(jì)算該用戶的訪問(wèn)時(shí)間或者點(diǎn)擊次數(shù)等。

Join??

Kafka Stream 由于包含 KStream 和 Ktable 兩種數(shù)據(jù)集,因此提供如下 Join 計(jì)算

  • KTable Join KTable 結(jié)果仍為 KTable。任意一邊有更新,結(jié)果 KTable 都會(huì)更新。

  • KStream Join KStream 結(jié)果為 KStream。必須帶窗口操作,否則會(huì)造成 Join 操作一直不結(jié)束。

  • KStream Join KTable / GlobakKTable 結(jié)果為 KStream。只有當(dāng) KStream 中有新數(shù)據(jù)時(shí),才會(huì)觸發(fā) Join 計(jì)算并輸出結(jié)果。KStream 無(wú)新數(shù)據(jù)時(shí),KTable 的更新并不會(huì)觸發(fā) Join 計(jì)算,也不會(huì)輸出數(shù)據(jù)。并且該更新只對(duì)下次 Join 生效。一個(gè)典型的使用場(chǎng)景是,KStream 中的訂單信息與 KTable 中的用戶信息做關(guān)聯(lián)計(jì)算。

對(duì)于 Join 操作,如果要得到正確的計(jì)算結(jié)果,需要保證參與 Join 的 KTable 或 KStream 中 Key 相同的數(shù)據(jù)被分配到同一個(gè) Task。具體方法是

  • 參與 Join 的 KTable 或 KStream 的 Key 類型相同(實(shí)際上,業(yè)務(wù)含意也應(yīng)該相同)

  • 參與 Join 的 KTable 或 KStream 對(duì)應(yīng)的 Topic 的 Partition 數(shù)相同Partitioner 策略的最終結(jié)果等效(實(shí)現(xiàn)不需要完全一樣,只要效果一樣即可),也即 Key 相同的情況下,被分配到 ID 相同的 Partition 內(nèi)

  • 如果上述條件不滿足,可通過(guò)調(diào)用如下方法使得它滿足上述條件。

    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

聚合與亂序處理??

聚合操作可應(yīng)用于 KStream 和 KTable。當(dāng)聚合發(fā)生在 KStream 上時(shí)必須指定窗口,從而限定計(jì)算的目標(biāo)數(shù)據(jù)集。

需要說(shuō)明的是,聚合操作的結(jié)果肯定是 KTable。因?yàn)?KTable 是可更新的,可以在晚到的數(shù)據(jù)到來(lái)時(shí)(也即發(fā)生數(shù)據(jù)亂序時(shí))更新結(jié)果 KTable。

這里舉例說(shuō)明。假設(shè)對(duì) KStream 以 5 秒為窗口大小,進(jìn)行 Tumbling Time Window 上的 Count 操作。并且 KStream 先后出現(xiàn)時(shí)間為 1 秒, 3 秒, 5 秒的數(shù)據(jù),此時(shí) 5 秒的窗口已達(dá)上限,Kafka Stream 關(guān)閉該窗口,觸發(fā) Count 操作并將結(jié)果 3 輸出到 KTable 中(假設(shè)該結(jié)果表示為<1-5,3>)。若 1 秒后,又收到了時(shí)間為 2 秒的記錄,由于 1-5 秒的窗口已關(guān)閉,若直接拋棄該數(shù)據(jù),則可認(rèn)為之前的結(jié)果<1-5,3>不準(zhǔn)確。

而如果直接將完整的結(jié)果<1-5,4>輸出到 KStream 中,則 KStream 中將會(huì)包含該窗口的 2 條記錄,<1-5,3>, <1-5,4>,也會(huì)存在骯數(shù)據(jù)。因此 Kafka Stream 選擇將聚合結(jié)果存于 KTable 中,此時(shí)新的結(jié)果<1-5,4>會(huì)替代舊的結(jié)果<1-5,3>。用戶可得到完整的正確的結(jié)果。

這種方式保證了數(shù)據(jù)準(zhǔn)確性,同時(shí)也提高了容錯(cuò)性。

但需要說(shuō)明的是,Kafka Stream 并不會(huì)對(duì)所有晚到的數(shù)據(jù)都重新計(jì)算并更新結(jié)果集,而是讓用戶設(shè)置一個(gè) retention period,將每個(gè)窗口的結(jié)果集在內(nèi)存中保留一定時(shí)間,該窗口內(nèi)的數(shù)據(jù)晚到時(shí),直接合并計(jì)算,并更新結(jié)果 KTable。超過(guò) retention period 后,該窗口結(jié)果將從內(nèi)存中刪除,并且晚到的數(shù)據(jù)即使落入窗口,也會(huì)被直接丟棄。

容錯(cuò)??

Kafka Stream 從如下幾個(gè)方面進(jìn)行容錯(cuò)

  • 高可用的 Partition 保證無(wú)數(shù)據(jù)丟失。每個(gè) Task 計(jì)算一個(gè) Partition,而 Kafka 數(shù)據(jù)復(fù)制機(jī)制保證了 Partition 內(nèi)數(shù)據(jù)的高可用性,故無(wú)數(shù)據(jù)丟失風(fēng)險(xiǎn)。同時(shí)由于數(shù)據(jù)是持久化的,即使任務(wù)失敗,依然可以重新計(jì)算。

  • 狀態(tài)存儲(chǔ)實(shí)現(xiàn)快速故障恢復(fù)和從故障點(diǎn)繼續(xù)處理。對(duì)于 Join 和聚合及窗口等有狀態(tài)計(jì)算,狀態(tài)存儲(chǔ)可保存中間狀態(tài)。即使發(fā)生 Failover 或 Consumer Rebalance,仍然可以通過(guò)狀態(tài)存儲(chǔ)恢復(fù)中間狀態(tài),從而可以繼續(xù)從 Failover 或 Consumer Rebalance 前的點(diǎn)繼續(xù)計(jì)算。

  • KTable 與 retention period 提供了對(duì)亂序數(shù)據(jù)的處理能力。

Kafka Stream 應(yīng)用示例??

?

下面結(jié)合一個(gè)案例來(lái)講解如何開發(fā) Kafka Stream 應(yīng)用。本例完整代碼可從作者 Github 獲取。

訂單 KStream(名為 orderStream),底層 Topic 的 Partition 數(shù)為 3,Key 為用戶名,Value 包含用戶名,商品名,訂單時(shí)間,數(shù)量。用戶 KTable(名為 userTable),底層 Topic 的 Partition 數(shù)為 3,Key 為用戶名,Value 包含性別,地址和年齡。商品 KTable(名為 itemTable),底層 Topic 的 Partition 數(shù)為 6,Key 為商品名,價(jià)格,種類和產(chǎn)地。現(xiàn)在希望計(jì)算每小時(shí)購(gòu)買產(chǎn)地與自己所在地相同的用戶總數(shù)。

首先由于希望使用訂單時(shí)間,而它包含在 orderStream 的 Value 中,需要通過(guò)提供一個(gè)實(shí)現(xiàn) TimestampExtractor 接口的類從 orderStream 對(duì)應(yīng)的 Topic 中抽取出訂單時(shí)間。

public class OrderTimestampExtractor implements TimestampExtractor {@Overridepublic long extract(ConsumerRecord<Object, Object> record) {if(record instanceof Order) {return ((Order)record).getTS();} else {return 0;}} }

接著通過(guò)將 orderStream 與 userTable 進(jìn)行 Join,來(lái)獲取訂單用戶所在地。由于二者對(duì)應(yīng)的 Topic 的 Partition 數(shù)相同,且 Key 都為用戶名,再假設(shè) Producer 往這兩個(gè) Topic 寫數(shù)據(jù)時(shí)所用的 Partitioner 實(shí)現(xiàn)相同,則此時(shí)上文所述 Join 條件滿足,可直接進(jìn)行 Join。

orderUserStream = orderStream.leftJoin(userTable, // 該 lamda 表達(dá)式定義了如何從 orderStream 與 userTable 生成結(jié)果集的 Value(Order order, User user) -> OrderUser.fromOrderUser(order, user), // 結(jié)果集 Key 序列化方式Serdes.String(),// 結(jié)果集 Value 序列化方式SerdesFactory.serdFrom(Order.class)).filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

從上述代碼中,可以看到,Join 時(shí)需要指定如何從參與 Join 雙方的記錄生成結(jié)果記錄的 Value。Key 不需要指定,因?yàn)榻Y(jié)果記錄的 Key 與 Join Key 相同,故無(wú)須指定。Join 結(jié)果存于名為 orderUserStream 的 KStream 中。

接下來(lái)需要將 orderUserStream 與 itemTable 進(jìn)行 Join,從而獲取商品產(chǎn)地。此時(shí) orderUserStream 的 Key 仍為用戶名,而 itemTable 對(duì)應(yīng)的 Topic 的 Key 為產(chǎn)品名,并且二者的 Partition 數(shù)不一樣,因此無(wú)法直接 Join。此時(shí)需要通過(guò) through 方法,對(duì)其中一方或雙方進(jìn)行重新分區(qū),使得二者滿足 Join 條件。這一過(guò)程相當(dāng)于 Spark 的 Shuffle 過(guò)程和 Storm 的 FieldGrouping。

orderUserStrea.through(// Key 的序列化方式Serdes.String(),// Value 的序列化方式 SerdesFactory.serdFrom(OrderUser.class), // 重新按照商品名進(jìn)行分區(qū),具體取商品名的哈希值,然后對(duì)分區(qū)數(shù)取模(String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, "orderuser-repartition-by-item").leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))

從上述代碼可見,through 時(shí)需要指定 Key 的序列化器,Value 的序列化器,以及分區(qū)方式和結(jié)果集所在的 Topic。這里要注意,該 Topic(orderuser-repartition-by-item)的 Partition 數(shù)必須與 itemTable 對(duì)應(yīng) Topic 的 Partition 數(shù)相同,并且 through 使用的分區(qū)方法必須與 iteamTable 對(duì)應(yīng) Topic 的分區(qū)方式一樣。經(jīng)過(guò)這種 through 操作,orderUserStream 與 itemTable 滿足了 Join 條件,可直接進(jìn)行 Join。

總結(jié)??

?

  • Kafka Stream 的并行模型完全基于 Kafka 的分區(qū)機(jī)制和 Rebalance 機(jī)制,實(shí)現(xiàn)了在線動(dòng)態(tài)調(diào)整并行度

  • 同一 Task 包含了一個(gè)子 Topology 的所有 Processor,使得所有處理邏輯都在同一線程內(nèi)完成,避免了不必的網(wǎng)絡(luò)通信開銷,從而提高了效率。

  • through 方法提供了類似 Spark 的 Shuffle 機(jī)制,為使用不同分區(qū)策略的數(shù)據(jù)提供了 Join 的可能

  • log compact 提高了基于 Kafka 的 state store 的加載效率

  • state store 為狀態(tài)計(jì)算提供了可能

  • 基于 offset 的計(jì)算進(jìn)度管理以及基于 state store 的中間狀態(tài)管理為發(fā)生 Consumer rebalance 或 Failover 時(shí)從斷點(diǎn)處繼續(xù)處理提供了可能,并為系統(tǒng)容錯(cuò)性提供了保障

  • KTable 的引入,使得聚合計(jì)算擁用了處理亂序問(wèn)題的能力

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/7449471.html

總結(jié)

以上是生活随笔為你收集整理的流式计算新贵Kafka Stream设计详解--转的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 999国产| 裸体一区二区 | 8050午夜一级毛片久久亚洲欧 | 加勒比hezyo黑人专区 | 日韩精品二区在线观看 | 久久亚洲精品视频 | av大全免费| 一区二区在线观看免费视频 | 中文字幕在线免费播放 | 一级 黄 色 片69 | 日本在线加勒比 | 在线看h网站 | 91av爱爱| 中文字幕在线观看欧美 | 亚洲色图在线观看 | 放几个免费的毛片出来看 | 黄网址在线 | 午夜神器在线观看 | 日韩性xx| 五月婷婷丁香久久 | 伊人久久影院 | 麻豆网页| 亚洲国产日韩在线 | 精品麻豆 | 一区二区三区在线观看 | 熟女一区二区三区四区 | 99热.com| 国产精品久久国产精品 | 天堂av在线免费 | 黄色av中文字幕 | 69国产精品 | 深夜视频在线播放 | 国产后入又长又硬 | www.在线观看视频 | 新天堂在线 | 中国国产黄色片 | 黄色免费91 | 久久福利网站 | 俺去操| 小蝌蚪av| 精品久久久无码中文字幕 | 裸体视频软件 | 911福利视频| www日韩| 欧美色亚洲色 | 一对一色视频聊天a | 成人一级黄色 | 杨幂国产精品一区二区 | 91久久爽久久爽爽久久片 | 亚洲日本久久 | 天天干天天操天天摸 | 成人国产精品免费观看 | 日本成人免费视频 | 男女在线观看视频 | 欧美精品亚洲一区 | 亚洲欧美日韩一区 | 久久99久久99精品蜜柚传媒 | 亚洲av成人精品一区二区三区在线播放 | av官网在线观看 | 亚洲自拍偷拍视频 | 麻豆黄色片 | 午夜天堂视频 | 黄a在线观看 | 色网站在线播放 | 九九视频免费 | h小视频在线观看 | 国产福利免费观看 | 免费精品 | 丰满少妇在线观看网站 | 日韩极品少妇 | 日本中文字幕在线 | 欧美大片18| 四色成人av永久网址 | 国产日韩第一页 | 特级西西人体wwwww | 黄色网址哪里有 | 成人在线a | 国产精品九九九九九 | 在线中文字幕亚洲 | 亚洲一卡一卡 | 国产女主播喷水高潮网红在线 | 风间由美在线观看 | 久久情趣视频 | 婷婷综合精品 | www.五月天com | 四虎免费观看 | 亚洲爱爱网站 | 天堂一区 | 91精品国产综合久久精品图片 | 国产夫绿帽单男3p精品视频 | 国产成人一区二区三区小说 | 国产色无码精品视频国产 | 骚虎tv| 91网页版 | 中文字幕巨乳 | 久久亚洲日本 | 99久久精品日本一区二区免费 | 青青青在线视频 | 欧美福利视频在线观看 |