日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka的消费者概念

發(fā)布時(shí)間:2024/5/14 编程问答 53 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka的消费者概念 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

應(yīng)用程序使用 KafkaConsumer向 Kafka 訂閱主題,并從訂閱的主題上接收消息 。 從 Kafka 讀取數(shù)據(jù)不同于從其他悄息系統(tǒng)讀取數(shù)據(jù),它涉及一些獨(dú)特的概念和想法。如果不先理解 這些概念,就難以理解如何使用消費(fèi)者 API。所以我們接下來先解釋這些重要的概念,然 后再舉幾個(gè)例子,橫示如何使用消費(fèi)者 API 實(shí)現(xiàn)不同的應(yīng)用程序。

消費(fèi)者和消費(fèi)者群組

假設(shè)我們有一個(gè)應(yīng)用程序需要從-個(gè) Kafka主題讀取消息井驗(yàn)證這些消息,然后再把它們 保存起來。應(yīng)用程序需要?jiǎng)?chuàng)建一個(gè)消費(fèi)者對象,訂閱主題并開始接收消息,然后驗(yàn)證消息 井保存結(jié)果。過了 一陣子,生產(chǎn)者往主題寫入消息的速度超過了應(yīng)用程序驗(yàn)證數(shù)據(jù)的速 度,這個(gè)時(shí)候該怎么辦?如果只使用單個(gè)消費(fèi)者處理消息,應(yīng)用程序會(huì)遠(yuǎn)跟不上消息生成 的速度。顯然,此時(shí)很有必要對消費(fèi)者進(jìn)行橫向伸縮。就像多個(gè)生產(chǎn)者可以向相同的 主題 寫入消息一樣,我們也可以使用多個(gè)消費(fèi)者從同一個(gè)主題讀取消息,對消息進(jìn)行分流。

Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個(gè)群組里的消費(fèi)者訂閱的是同一個(gè)主題,每個(gè)消費(fèi)者 接收主題一部分分區(qū)的消息。

假設(shè)主題 T1 有 4 個(gè)分區(qū),我們創(chuàng)建了消費(fèi)者 C1 ,它是群組 G1 里唯 一 的消費(fèi)者,我們用 它訂閱主題 T1。消費(fèi)者 Cl1將收到主題 T1全部 4個(gè)分區(qū)的消息,如圖 4-1 所示。

如果在群組 G1 里新增一個(gè)消費(fèi)者 C2,那么每個(gè)消費(fèi)者將分別從兩個(gè)分區(qū)接收消息。我 假設(shè)消費(fèi)者 C1接收分區(qū) 0 和分區(qū) 2 的消息,消費(fèi)者 C2 接收分區(qū) 1 和分區(qū) 3 的消息,如圖 4-2 所示。

如果群組 G1 有 4 個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者可以分配到 一個(gè)分區(qū),如圖 4-3 所示。

如果我們往群組里添加更多的消費(fèi)者,超過主題的分區(qū)數(shù)量,那么多出的消費(fèi)者就會(huì)被閑置,不會(huì)接收到任何消息。

往群組里增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式。 Kafka 消費(fèi)者經(jīng)常會(huì)做一些高延遲的操作,比如把數(shù)據(jù)寫到數(shù)據(jù)庫或 HDFS,或者使用數(shù)據(jù)進(jìn)行比較耗時(shí)的計(jì)算。在這些情況下,單個(gè)消費(fèi)者無法跟上數(shù)據(jù)生成的速度,所以可以增加更多的消費(fèi)者,讓它們分擔(dān)負(fù)載,每個(gè)消費(fèi)者只處理部分分區(qū)的消息,這就是橫向伸縮的主要手段。我們有必要為主題創(chuàng)建大量的分區(qū),在負(fù)載增長時(shí)可以加入更多的消費(fèi)者。不過要性意,不要讓消費(fèi)者的數(shù)量超過主題分區(qū)的數(shù)量,多余的消費(fèi)者只會(huì)被閑置。

除了通過增加消費(fèi)者來橫向伸縮單個(gè)應(yīng)用程序外,還經(jīng)常出現(xiàn)多個(gè)應(yīng)用程序從同一個(gè)主題讀取數(shù)據(jù)的情況。實(shí)際上, Kafka 設(shè)計(jì)的主要目標(biāo)之一 ,就是要讓 Kafka 主題里的數(shù)據(jù)能夠滿足企業(yè)各種應(yīng)用場景的需求。在這些場景里,每個(gè)應(yīng)用程序可以獲取到所有的消息, 而不只是其中的 一部分。只要保證每個(gè)應(yīng)用程序有自己的消費(fèi)者群組,就可以讓它們獲取到主題所有的消息。不同于傳統(tǒng)的消息系統(tǒng),橫向伸縮 Kafka消費(fèi)者和消費(fèi)者群組并不會(huì)對性能造成負(fù)面影響。

在上面的例子里,如果新增一個(gè)只包含一個(gè)消費(fèi)者的群組 G2,那么這個(gè)消費(fèi)者將從主題 T1 上接收所有的消息,與群組 G1 之間互不影響。群組 G2 可以增加更多的消費(fèi)者,每個(gè)消費(fèi)者可以消費(fèi)若干個(gè)分區(qū),就像群組 G1 那樣,如圖 4-5 所示。總的來說,群組 G2 還是會(huì)接收到所有消息,不管有沒有其他群組存在。

簡而言之,為每一個(gè)需要獲取一個(gè)或多個(gè)主題全部消息的應(yīng)用程序創(chuàng)建一個(gè)消費(fèi)者群組, 然后往群組里添加消費(fèi)者來伸縮讀取能力和處理能力,群組里的每個(gè)消費(fèi)者只處理一部分消息。

消費(fèi)者群組和分區(qū)再均衡

我們已經(jīng)從上一個(gè)小節(jié)了解到,群組里的消費(fèi)者共同讀取主題的分區(qū)。一個(gè)新的消費(fèi)者加 入群組時(shí),它讀取的是原本由其他消費(fèi)者讀取的消息。當(dāng)一個(gè)消費(fèi)者被關(guān)閉或發(fā)生崩潰時(shí),它就離開群組,原本由它讀取的分區(qū)將由群組里的其他消費(fèi)者來讀取。在主題發(fā)生變化時(shí) , 比如管理員添加了新的分區(qū),會(huì)發(fā)生分區(qū)重分配。

分區(qū)的所有權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者,這樣的行為被稱為再均衡。再均衡非常重要, 它為消費(fèi)者群組帶來了高可用性和伸縮性(我們可以放心地添加或移除消費(fèi)者), 不過在正常情況下,我們并不希望發(fā)生這樣的行為。在再均衡期間,消費(fèi)者無法讀取消息,造成整個(gè)群組一小段時(shí)間的不可用。另外,當(dāng)分區(qū)被重新分配給另 一個(gè)消費(fèi)者時(shí),消費(fèi)者當(dāng)前的讀取狀態(tài)會(huì)丟失,它有可能還需要去刷新緩存 ,在它重新恢復(fù)狀態(tài)之前會(huì)拖慢應(yīng)用程序。我們將在本章討論如何進(jìn)行安全的再均衡,以及如何避免不必要的再均衡。

消費(fèi)者通過向被指派為 群組協(xié)調(diào)器的 broker (不同的群組可以有不同的協(xié)調(diào)器)發(fā)送 心跳 來維持它們和群組的從屬關(guān)系以及它們對分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常的時(shí)間間隔發(fā)送心跳,就被認(rèn)為是活躍的,說明它還在讀取分區(qū)里的消息。消費(fèi)者會(huì)在輪詢消息 (為了獲取消息)或提交偏移量時(shí)發(fā)送心跳。如果消費(fèi)者停止發(fā)送心跳的時(shí)間足夠長,會(huì)話就會(huì)過期,群組協(xié)調(diào)器認(rèn)為它已經(jīng)死亡,就會(huì)觸發(fā)一次再均衡。

如果一個(gè)消費(fèi)者發(fā)生崩潰,井停止讀取消息,群組協(xié)調(diào)器(broker)會(huì)等待幾秒鐘,確認(rèn)它死亡了才會(huì)觸發(fā)再均衡。在這幾秒鐘時(shí)間里,死掉的消費(fèi)者不會(huì)讀取分區(qū)里的消息。在清理消費(fèi)者時(shí),消費(fèi)者會(huì)通知協(xié)調(diào)器它將要離開群組,協(xié)調(diào)器會(huì)立即觸發(fā)一次再均衡,盡量降低處理停頓。在本章的后續(xù)部分,我們將討論一些用于控制發(fā)送心跳頻率和會(huì)話過期時(shí)間的配置參數(shù),以及如何根據(jù)實(shí)際需要來配置這些參數(shù) 。

分配分區(qū)是怎樣的一個(gè)過程

當(dāng)消費(fèi)者要加入群組時(shí),它會(huì)向群組協(xié)調(diào)器發(fā)送 一 個(gè) JoinGroup 請求。第 一 個(gè)加入群組的消費(fèi)者將成為“群主”。群主從協(xié)調(diào)器那里獲得群組的成員列 表(列表中包含了所有最近發(fā)送過心跳的消費(fèi)者,它們被認(rèn)為是活躍的), 并負(fù)責(zé)給每一個(gè)消費(fèi)者分配分區(qū)。它使用 一個(gè)實(shí)現(xiàn)了 PartitionAssignor接口的類來決定哪些分 區(qū)應(yīng)該被分配給哪個(gè)消費(fèi)者 。

Kafka 內(nèi)置了兩種分配策略,在后面的配置參數(shù)小節(jié)我們將深入討論。分配完畢之后,群主把分配情況列表發(fā)送給群組協(xié)調(diào)器,協(xié)調(diào)器再把這些信息發(fā)送給所有消費(fèi)者。每個(gè)消費(fèi)者只能看到自己的分配信息,只有群 主知道群組 里所有消費(fèi)者的分配信息。這個(gè)過程會(huì)在每次再均衡時(shí)重復(fù)發(fā)生。

創(chuàng)建 Kafka消費(fèi)者

在讀取消息之前,需要先創(chuàng)建 一個(gè) KafkaConsumer對象 。 創(chuàng)建 KafkaConsumer 對象與創(chuàng)建 KafkaProducer對象非常相似——把想要傳給消費(fèi)者的屬性放在 Properties 對象里。本章 后續(xù)部分會(huì)深入討論所有的屬性。在這里,我們只需要使用 3個(gè)必要的屬性: bootstrap.servers、 key.deserializer、 value.deserializer。

下面代碼演示了如何創(chuàng)建一個(gè)KafkaConsumer對象:

Properties props = new Properties();props.put("bootstrap.servers", "broker1:9092, broker2:9092");props.put("group.id", "CountryCounter");props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

deserializer使用指定的類(反序列化器)把字節(jié)數(shù)組轉(zhuǎn)成 Java對象。

group.id指定了KafkaConsumer 屬于哪一個(gè)消費(fèi)者群組。
group.id不是必需的,不過我們現(xiàn)在姑且認(rèn)為它是必需的。它指定了 KafkaConsumer 屬于哪一個(gè)消費(fèi)者群組。創(chuàng)建不屬于任何一個(gè)群組的消費(fèi)者也是可以的,只是這樣做不太常見。

訂閱主題

創(chuàng)建好消費(fèi)者之后,下一步可以開始訂閱主題了。subscribe()方法接受一個(gè)主題列表作為參數(shù)

consumer.subscribe(Collections.singletonList("customerCountries"));

在這里我們創(chuàng)建了一個(gè)包含單個(gè)元素的列表,主題的名字叫作“customerCountries”,我們也可以在調(diào)用subscribe()方法時(shí)傳入一個(gè)正則表達(dá)式,正則表達(dá)式可以匹配多個(gè)主題如果有人創(chuàng)建了新的主題,并且主題名與正則表達(dá)式匹配,那么會(huì)立即觸發(fā)一次再均衡,消費(fèi)者就可以讀取新添加的主題。如果應(yīng)用程序需要讀取多個(gè)主題,并且可以處理不同類型的數(shù)據(jù),那么這種訂閱方式就很管用。在Kafka和其他系統(tǒng)之間復(fù)制數(shù)據(jù)時(shí),使用正則表達(dá)式的方式訂閱多個(gè)主題時(shí)很常見的做法。

要訂閱所有test相關(guān)的主題,可以這樣做:consumer.subscribe("test.*");

輪詢

消息輪詢是消費(fèi)者 API 的核心,通過一個(gè)簡單的輪詢向服務(wù)器請求數(shù)據(jù)。一旦消費(fèi)者訂閱了主題 ,輪詢就會(huì)處理所有的細(xì)節(jié),包括群組協(xié)調(diào)、分區(qū)再均衡、發(fā)送心跳和獲取數(shù)據(jù), 開發(fā)者只需要使用一組簡單的 API 來處理從分區(qū)返回的數(shù)據(jù)。消費(fèi)者代碼的主要部分如下所示 :

輪詢不只是獲取數(shù)據(jù)那么簡單。在第一次調(diào)用新消費(fèi)者的 poll() 方法時(shí),它會(huì)負(fù)責(zé)查找 GroupCoordinator, 然后加入群組,接受分配的分區(qū)。 如果發(fā)生了再均衡,整個(gè)過程也是在輪詢期間進(jìn)行的。當(dāng)然 ,心跳也是從輪詢里發(fā)迭出去的。所以,我們要確保在輪詢期間所做的任何處理工作都應(yīng)該盡快完成。

線程安全

在同一個(gè)群組中,我們無法讓一個(gè)線程運(yùn)行多個(gè)消費(fèi)者,也無法讓多個(gè)線程安全地共享一個(gè)消費(fèi)者。按照規(guī)則,一個(gè)消費(fèi)者使用一個(gè)線程。如果要在同一個(gè)消費(fèi)者群組里運(yùn)行多個(gè)消費(fèi)者,需要讓每個(gè)消費(fèi)者運(yùn)行在自己的線程里。最好是把消費(fèi)者的邏輯封裝在自己的對象里,然后使用Java的ExecutorService啟動(dòng)多個(gè)線程,使每個(gè)消費(fèi)者運(yùn)行在自己的線程上。Confluent的博客(https://www.confluent.io/blog/)上有一個(gè)教程介紹如何處理這種情況。


消費(fèi)者的配置

到目前為止,我們學(xué)習(xí)了如何使用消費(fèi)者 API,不過只介紹了幾個(gè)配置屬’性一一如bootstrap.servers、 key.deserializer、 value.deserializer、group.id。 Kafka的文檔列出了所有與消費(fèi)者相關(guān)的配置說明。大部分參數(shù)都有合理的默認(rèn)值,一般不需要修改它們,不過有一些參數(shù)與消費(fèi) 者的性能和可用性有很大關(guān)系。接下來介紹這些重要的屬性。

1. fetch.min.bytes

該屬性指定了消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。 broker 在收到消費(fèi)者的數(shù)據(jù)請求時(shí), 如果可用的數(shù)據(jù)量小于 fetch.min.bytes指定的大小,那么它會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。這樣可以降低消費(fèi)者和 broker 的工作負(fù)載,因?yàn)樗鼈冊谥黝}不是很活躍的時(shí)候(或者一天里的低谷時(shí)段)就不需要來來回回地處理消息。如果沒有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率卻很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大。如果消費(fèi)者的數(shù)量比較多,把該屬性的值設(shè)置得大一點(diǎn)可以降低 broker 的工作負(fù)載。

2. fetch.max.wait.ms

我們通過 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。而 fetch.max.wait.ms則用于指定 broker的等待時(shí)間,默認(rèn)是 500ms。如果沒有足夠的數(shù)據(jù)流入 Kafka,消費(fèi)者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導(dǎo)致500ms的延遲。 如果要降低潛在的延遲(為了滿足 SLA),可以把該參數(shù)值設(shè)置得小一些。如果 fetch.max.wait.ms被設(shè) 為 100ms,并且 fetch.min.bytes 被設(shè)為 1MB,那么 Kafka在收到消費(fèi)者的請求后,要么返 回 1MB 數(shù)據(jù),要么在 100ms 后返回所有可用的數(shù)據(jù) , 就看哪個(gè)條件先得到滿足。

3. max.parition.fetch.bytes

該屬性指定了服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)。它的默認(rèn)值是 1MB,也 就是說, KafkaConsumer.poll() 方法從每個(gè)分區(qū)里返回的記錄最多不超過 max.parition.fetch.bytes 指定的字節(jié)。如果一個(gè)主題有 20個(gè)分區(qū)和 5 個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者需要至少 4MB 的可用內(nèi)存來接收記錄。在為消費(fèi)者分配內(nèi)存時(shí),可以給它們多分配一些,因 為如果群組里有消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū)。 max.parition.fetch.bytes 的值必須比 broker能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size屬 性配置 )大, 否則消費(fèi)者可能無法讀取這些消息,導(dǎo)致消費(fèi)者一直掛起重試。在設(shè)置該屬性時(shí),另一個(gè)需要考慮的因素是消費(fèi)者處理數(shù)據(jù)的時(shí)間。 消費(fèi)者需要頻繁調(diào)用 poll() 方法來避免會(huì)話過期和發(fā)生分區(qū)再均衡,如果單次調(diào)用 poll() 返回的數(shù)據(jù)太多,消費(fèi)者需要更多的時(shí)間來處理,可能無法及時(shí)進(jìn)行下一個(gè)輪詢來避免會(huì)話過期。如果出現(xiàn)這種情況, 可以把 max.parition.fetch.bytes 值改小 ,或者延長會(huì)話過期時(shí)間。

4. session.timeout.ms

該屬性指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間,默認(rèn)是 3s。如果消費(fèi)者沒有在 session.timeout.ms 指定的時(shí)間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就被認(rèn)為已經(jīng)死亡,協(xié)調(diào)器就會(huì)觸發(fā)再均衡,把它的分區(qū)分配給群組里的其他消費(fèi)者 。該屬性與 heartbeat.interval.ms緊密相關(guān)。heartbeat.interval.ms 指定了poll()方法向協(xié)調(diào)器 發(fā)送心跳的頻 率, session.timeout.ms 則指定了消費(fèi)者可以多久不發(fā)送心跳。所以, 一般需要同時(shí)修改這兩個(gè)屬性, heartbeat.interval.ms 必須比 session.timeout.ms 小, 一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 ls。 把 session.timeout.ms 值設(shè) 得比默認(rèn)值小,可以更快地檢測和恢 復(fù)崩潰的節(jié)點(diǎn),不過長時(shí)間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡。把該屬性的值設(shè)置得大一些,可以減少意外的再均衡 ,不過檢測節(jié)點(diǎn)崩潰需要更長的時(shí)間。

5. auto.offset.reset

該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下(因消費(fèi)者長時(shí)間失效,包含偏移量的記錄已經(jīng)過時(shí)井被刪除)該作何處理。它的默認(rèn)值是latest, 意 思是說,在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者 啟動(dòng)之 后生成的記錄)。另一個(gè)值是 earliest,意思是說,在偏移量無效的情況下,消費(fèi)者將從 起始位置讀取分區(qū)的記錄。

6. enable.auto.commit

我們稍后將介紹 幾種 不同的提交偏移量的方式。該屬性指定了消費(fèi)者是否自動(dòng)提交偏移量,默認(rèn)值是 true。為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)為 false,由自己控制何時(shí)提交偏移量。如果把它設(shè)為 true,還可以通過配置 auto.commit.interval.mls 屬性來控制提交的頻率。

7. partition.assignment.strategy

我們知道,分區(qū)會(huì)被分配給群組里的消費(fèi)者。 PartitionAssignor 根據(jù)給定的消費(fèi)者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個(gè)消費(fèi)者。 Kafka 有兩個(gè)默認(rèn)的分配策略 。

  • Range

該策略會(huì)把主題的若干個(gè)連續(xù)的分區(qū)分配給消費(fèi)者。假設(shè)悄費(fèi)者 C1 和消費(fèi)者 C2 同時(shí) 訂閱了主題 T1 和主題 T2,井且每個(gè)主題有 3 個(gè)分區(qū)。那么消費(fèi)者 C1 有可能分配到這 兩個(gè)主題的分區(qū) 0 和 分區(qū) 1,而消費(fèi)者 C2 分配到這兩個(gè)主題 的分區(qū) 2。因?yàn)槊總€(gè)主題 擁有奇數(shù)個(gè)分區(qū),而分配是在主題內(nèi)獨(dú)立完成的,第一個(gè)消費(fèi)者最后分配到比第二個(gè)消費(fèi)者更多的分區(qū)。只要使用了 Range策略,而且分區(qū)數(shù)量無法被消費(fèi)者數(shù)量整除,就會(huì)出現(xiàn)這種情況。

  • RoundRobin

該策略把主題的所有分區(qū)逐個(gè)分配給消費(fèi)者。如果使用 RoundRobin 策略來給消費(fèi)者 C1 和消費(fèi)者 C2分配分區(qū),那么消費(fèi)者 C1 將分到主題 T1 的分區(qū) 0和分區(qū) 2以及主題 T2 的分區(qū) 1,消費(fèi)者 C2 將分配到主題 T1 的分區(qū) l 以及主題T2 的分區(qū) 0和分區(qū) 2。一般 來說,如果所有消費(fèi)者都訂閱相同的主題(這種情況很常見), RoundRobin策略會(huì)給所 有消費(fèi)者分配相同數(shù)量 的分區(qū)(或最多就差一個(gè)分區(qū))。

可以通過設(shè)置 partition.assignment.strategy 來選擇分區(qū)策略。默認(rèn)使用的是 org. apache.kafka.clients.consumer.RangeAssignor, 這個(gè)類實(shí)現(xiàn)了 Range策略,不過也可以 把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我們還可以使用自定 義策略,在這種情況下 , partition.assignment.strategy 屬性的值就是自定義類的名字。

8. client.id

該屬性可以是任意字符串 , broker用它來標(biāo)識(shí)從客戶端發(fā)送過來的消息,通常被用在日志、度量指標(biāo)和配額里。

9. max.poll.records

該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量。

10. receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數(shù)據(jù)時(shí)用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)為-1,就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有 比較高的延遲和比較低的帶寬 。

?

原文出處:https://yq.aliyun.com/articles/641396

總結(jié)

以上是生活随笔為你收集整理的Kafka的消费者概念的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。