日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka 发布订阅_在Kafka中发布订阅模型

發(fā)布時間:2023/12/3 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 发布订阅_在Kafka中发布订阅模型 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

kafka 發(fā)布訂閱

這是第四個柱中的一系列關(guān)于同步客戶端集成與異步系統(tǒng)( 1, 2, 3 )。 在這里,我們將嘗試了解Kafka的工作方式,以便正確利用其發(fā)布-訂閱實現(xiàn)。

卡夫卡概念

根據(jù)官方文件 :

Kafka是一種分布式的,分區(qū)的,復(fù)制的提交日志服務(wù)。 它提供消息傳遞系統(tǒng)的功能,但具有獨特的設(shè)計。

Kafka作為集群運行,這些節(jié)點稱為代理。 代理可以是領(lǐng)導(dǎo)者或副本,以提供高可用性和容錯能力。 代理負(fù)責(zé)分區(qū),分區(qū)是存儲消息的分發(fā)單元。 這些消息是有序的,可以通過名為offset的索引進行訪問。 一組分區(qū)構(gòu)成一個主題,是消息的提要。 分區(qū)可以具有不同的使用者,并且它們使用自己的偏移量訪問消息。 生產(chǎn)者將消息發(fā)布到Kafka主題中。 Kafka文檔中的以下圖表可以幫助您理解這一點:

排隊與發(fā)布-訂閱

消費者群體是另一個關(guān)鍵概念,有助于解釋為什么Kafka比RabbitMQ等其他消息傳遞解決方案更靈活,功能更強大。 消費者與消費者群體相關(guān)聯(lián)。 如果每個使用者都屬于同一個使用者組,則主題的消息將在各個使用者之間平均負(fù)載均衡; 這就是所謂的“排隊模型”。 相反,如果每個使用者都屬于不同的使用者組,則所有消息都將在每個客戶端中使用。 這就是所謂的“發(fā)布-訂閱”模型。

您可以混合使用這兩種方法,分別針對不同的需求使用不同的邏輯使用者組,并在每個組中有多個使用者以通過并行提高吞吐量。 同樣, Kafka文檔中的另一個圖表:

了解我們的需求

正如我們在以前的文章(見1, 2, 3 )該項目服務(wù)發(fā)布消息到卡夫卡的話題叫item_deleted 。 此消息將位于該主題的一個分區(qū)中。 為了定義消息將駐留在哪個分區(qū),Kafka提供了三種選擇 :

  • 如果記錄中指定了分區(qū),請使用它
  • 如果未指定分區(qū)但存在密鑰,則根據(jù)密鑰的哈希值選擇一個分區(qū)
  • 如果不存在分區(qū)或密鑰,則以循環(huán)方式選擇一個分區(qū)

我們將使用item_id作為密鑰。 執(zhí)法服務(wù)的不同實例中包含的消費者僅對特定分區(qū)感興趣,因為他們保留某些項目的內(nèi)部狀態(tài)。 讓我們檢查不同的Kafka使用者實現(xiàn),以了解哪種使用最方便。

卡夫卡消費者

卡夫卡共有三個消費者: 高級消費者 , 簡單消費者和新消費者

在這三個消費者中, 簡單消費者在最低級別上運行。 它滿足我們的要求,因為它允許消費者“在流程中僅使用主題中分區(qū)的子集”。 但是,如文檔所述:

SimpleConsumer確實需要使用者組中不需要的大量工作:

  • 您必須跟蹤應(yīng)用程序中的偏移量,才能知道從何處停止消費
  • 您必須確定哪個Broker是主題和分區(qū)的主要Broker。
  • 您必須處理經(jīng)紀(jì)人負(fù)責(zé)人變更

如果您閱讀了建議的用于處理這些問題的代碼,則將不鼓勵您使用此使用者。

新使用者提供正確的抽象級別,并允許我們訂閱特定的分區(qū)。 他們在文檔中建議以下用例:

第一種情況是,如果進程正在維護與該分區(qū)相關(guān)聯(lián)的某種本地狀態(tài)(例如本地磁盤上的鍵值存儲),因此該進程應(yīng)僅獲取其在磁盤上維護的分區(qū)的記錄。

不幸的是,我們的系統(tǒng)使用的是Kafka 0.8,而該使用者僅從0.9開始可用。 我們沒有足夠的資源來遷移到該版本,因此我們需要堅持使用高級消費者 。

該使用者提供了一個不錯的API,但不允許我們訂閱特定的分區(qū)。 這意味著,執(zhí)法服務(wù)的每個實例都將使用每條消息,甚至是無關(guān)的消息。 我們可以通過為每個實例定義不同的消費者組來實現(xiàn)這一目標(biāo)。

利用Akka Event Bus

在上一篇文章中,我們定義了一些等待ItemDeleted消息的有限狀態(tài)機ItemDeleted 。

when(Active) {case Event(ItemDeleted(item), currentItemsToBeDeleted@ItemsToBeDeleted(items)) =>val newItemsToBeDeleted = items.filterNot(_ == item)newItemsToBeDeleted.size match {case 0 => finishWorkWith(CensorResult(Right()))case _ => stay using currentItemsToBeDeleted.copy(items = newItemsToBeDeleted)}}

我們的卡夫卡消費者可以將所有消息轉(zhuǎn)發(fā)給那些演員,并讓他們丟棄/過濾不相關(guān)的物品。 但是,我們不想讓演員浪費很多多余的工作,因此我們將添加一層抽象,讓他們以真正有效的方式丟棄適當(dāng)?shù)南ⅰ?

final case class MsgEnvelope(partitionKey: String, payload: ItemDeleted)class ItemDeletedBus extends EventBus with LookupClassification {override type Event = MsgEnvelopeoverride type Classifier = Stringoverride type Subscriber = ActorRefoverride protected def mapSize(): Int = 128override protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber ! event.payloadoverride protected def classify(event: Event): Classifier = event.partitionKeyoverride protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b) }

Akka Event Bus按分區(qū)為我們提供訂閱,而我們的Kafka高級消費者中缺少該分區(qū)。 我們將從卡夫卡消費者處發(fā)布每條消息到公交車上:

itemDeletedBus.publish(MsgEnvelope(item.partitionKey, ItemDeleted(item)))

在上一篇文章中,我們展示了如何使用該分區(qū)鍵訂閱消息:

itemDeletedBus.subscribe(self, item.partitionKey)

LookupClassification將過濾不需要的消息,因此我們的參與者不會過載。

摘要

得益于Kafka提供的靈活性,我們能夠設(shè)計我們的系統(tǒng)以了解不同的折衷方案。 在接下來的文章中,我們將看到如何協(xié)調(diào)這些FSM的結(jié)果以向客戶端提供同步響應(yīng)。

第一部分 | 第2部分 | 第三部分

翻譯自: https://www.javacodegeeks.com/2016/05/publish-subscribe-model-kafka.html

kafka 發(fā)布訂閱

總結(jié)

以上是生活随笔為你收集整理的kafka 发布订阅_在Kafka中发布订阅模型的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。