kafka消费者组概念
https://blog.csdn.net/cgs666/article/details/85257819
應(yīng)用程序使用 KafkaConsumer向 Kafka 訂閱主題,并從訂閱的主題上接收消息 。 從 Kafka 讀取數(shù)據(jù)不同于從其他悄息系統(tǒng)讀取數(shù)據(jù),它涉及一些獨(dú)特的概念和想法。如果不先理解 這些概念,就難以理解如何使用消費(fèi)者 API。所以我們接下來先解釋這些重要的概念,然 后再舉幾個例子,橫示如何使用消費(fèi)者 API 實現(xiàn)不同的應(yīng)用程序。
消費(fèi)者和消費(fèi)者群組
假設(shè)我們有一個應(yīng)用程序需要從-個 Kafka主題讀取消息井驗證這些消息,然后再把它們 保存起來。應(yīng)用程序需要創(chuàng)建一個消費(fèi)者對象,訂閱主題并開始接收消息,然后驗證消息 井保存結(jié)果。過了 一陣子,生產(chǎn)者往主題寫入消息的速度超過了應(yīng)用程序驗證數(shù)據(jù)的速 度,這個時候該怎么辦?如果只使用單個消費(fèi)者處理消息,應(yīng)用程序會遠(yuǎn)跟不上消息生成 的速度。顯然,此時很有必要對消費(fèi)者進(jìn)行橫向伸縮。就像多個生產(chǎn)者可以向相同的 主題 寫入消息一樣,我們也可以使用多個消費(fèi)者從同一個主題讀取消息,對消息進(jìn)行分流。
Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個群組里的消費(fèi)者訂閱的是同一個主題,每個消費(fèi)者 接收主題一部分分區(qū)的消息。
假設(shè)主題 T1 有 4 個分區(qū),我們創(chuàng)建了消費(fèi)者 C1 ,它是群組 G1 里唯 一 的消費(fèi)者,我們用 它訂閱主題 T1。消費(fèi)者 Cl1將收到主題 T1全部 4個分區(qū)的消息,如圖 4-1 所示。
如果在群組 G1 里新增一個消費(fèi)者 C2,那么每個消費(fèi)者將分別從兩個分區(qū)接收消息。我 假設(shè)消費(fèi)者 C1接收分區(qū) 0 和分區(qū) 2 的消息,消費(fèi)者 C2 接收分區(qū) 1 和分區(qū) 3 的消息,如圖 4-2 所示。
如果群組 G1 有 4 個消費(fèi)者,那么每個消費(fèi)者可以分配到 一個分區(qū),如圖 4-3 所示。
如果我們往群組里添加更多的消費(fèi)者,超過主題的分區(qū)數(shù)量,那么多出的消費(fèi)者就會被閑置,不會接收到任何消息。
往群組里增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式。 Kafka 消費(fèi)者經(jīng)常會做一些高延遲的操作,比如把數(shù)據(jù)寫到數(shù)據(jù)庫或 HDFS,或者使用數(shù)據(jù)進(jìn)行比較耗時的計算。在這些情況下,單個消費(fèi)者無法跟上數(shù)據(jù)生成的速度,所以可以增加更多的消費(fèi)者,讓它們分擔(dān)負(fù)載,每個消費(fèi)者只處理部分分區(qū)的消息,這就是橫向伸縮的主要手段。我們有必要為主題創(chuàng)建大量的分區(qū),在負(fù)載增長時可以加入更多的消費(fèi)者。不過要性意,不要讓消費(fèi)者的數(shù)量超過主題分區(qū)的數(shù)量,多余的消費(fèi)者只會被閑置。
除了通過增加消費(fèi)者來橫向伸縮單個應(yīng)用程序外,還經(jīng)常出現(xiàn)多個應(yīng)用程序從同一個主題讀取數(shù)據(jù)的情況。實際上, Kafka 設(shè)計的主要目標(biāo)之一 ,就是要讓 Kafka 主題里的數(shù)據(jù)能夠滿足企業(yè)各種應(yīng)用場景的需求。在這些場景里,每個應(yīng)用程序可以獲取到所有的消息, 而不只是其中的 一部分。只要保證每個應(yīng)用程序有自己的消費(fèi)者群組,就可以讓它們獲取到主題所有的消息。不同于傳統(tǒng)的消息系統(tǒng),橫向伸縮 Kafka消費(fèi)者和消費(fèi)者群組并不會對性能造成負(fù)面影響。
在上面的例子里,如果新增一個只包含一個消費(fèi)者的群組 G2,那么這個消費(fèi)者將從主題 T1 上接收所有的消息,與群組 G1 之間互不影響。群組 G2 可以增加更多的消費(fèi)者,每個消費(fèi)者可以消費(fèi)若干個分區(qū),就像群組 G1 那樣,如圖 4-5 所示。總的來說,群組 G2 還是會接收到所有消息,不管有沒有其他群組存在。
簡而言之,為每一個需要獲取一個或多個主題全部消息的應(yīng)用程序創(chuàng)建一個消費(fèi)者群組, 然后往群組里添加消費(fèi)者來伸縮讀取能力和處理能力,群組里的每個消費(fèi)者只處理一部分消息。
消費(fèi)者群組和分區(qū)再均衡
我們已經(jīng)從上一個小節(jié)了解到,群組里的消費(fèi)者共同讀取主題的分區(qū)。一個新的消費(fèi)者加 入群組時,它讀取的是原本由其他消費(fèi)者讀取的消息。當(dāng)一個消費(fèi)者被關(guān)閉或發(fā)生崩潰時,它就離開群組,原本由它讀取的分區(qū)將由群組里的其他消費(fèi)者來讀取。在主題發(fā)生變化時 , 比如管理員添加了新的分區(qū),會發(fā)生分區(qū)重分配。
分區(qū)的所有權(quán)從一個消費(fèi)者轉(zhuǎn)移到另一個消費(fèi)者,這樣的行為被稱為再均衡。再均衡非常重要, 它為消費(fèi)者群組帶來了高可用性和伸縮性(我們可以放心地添加或移除消費(fèi)者), 不過在正常情況下,我們并不希望發(fā)生這樣的行為。在再均衡期間,消費(fèi)者無法讀取消息,造成整個群組一小段時間的不可用。另外,當(dāng)分區(qū)被重新分配給另 一個消費(fèi)者時,消費(fèi)者當(dāng)前的讀取狀態(tài)會丟失,它有可能還需要去刷新緩存 ,在它重新恢復(fù)狀態(tài)之前會拖慢應(yīng)用程序。我們將在本章討論如何進(jìn)行安全的再均衡,以及如何避免不必要的再均衡。
消費(fèi)者通過向被指派為 群組協(xié)調(diào)器的 broker (不同的群組可以有不同的協(xié)調(diào)器)發(fā)送 心跳 來維持它們和群組的從屬關(guān)系以及它們對分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常的時間間隔發(fā)送心跳,就被認(rèn)為是活躍的,說明它還在讀取分區(qū)里的消息。消費(fèi)者會在輪詢消息 (為了獲取消息)或提交偏移量時發(fā)送心跳。如果消費(fèi)者停止發(fā)送心跳的時間足夠長,會話就會過期,群組協(xié)調(diào)器認(rèn)為它已經(jīng)死亡,就會觸發(fā)一次再均衡。
如果一個消費(fèi)者發(fā)生崩潰,井停止讀取消息,群組協(xié)調(diào)器(broker)會等待幾秒鐘,確認(rèn)它死亡了才會觸發(fā)再均衡。在這幾秒鐘時間里,死掉的消費(fèi)者不會讀取分區(qū)里的消息。在清理消費(fèi)者時,消費(fèi)者會通知協(xié)調(diào)器它將要離開群組,協(xié)調(diào)器會立即觸發(fā)一次再均衡,盡量降低處理停頓。在本章的后續(xù)部分,我們將討論一些用于控制發(fā)送心跳頻率和會話過期時間的配置參數(shù),以及如何根據(jù)實際需要來配置這些參數(shù) 。
分配分區(qū)是怎樣的一個過程
當(dāng)消費(fèi)者要加入群組時,它會向群組協(xié)調(diào)器發(fā)送 一 個 JoinGroup 請求。第 一 個加入群組的消費(fèi)者將成為“群主”。群主從協(xié)調(diào)器那里獲得群組的成員列 表(列表中包含了所有最近發(fā)送過心跳的消費(fèi)者,它們被認(rèn)為是活躍的), 并負(fù)責(zé)給每一個消費(fèi)者分配分區(qū)。它使用 一個實現(xiàn)了 PartitionAssignor接口的類來決定哪些分 區(qū)應(yīng)該被分配給哪個消費(fèi)者 。
Kafka 內(nèi)置了兩種分配策略,在后面的配置參數(shù)小節(jié)我們將深入討論。分配完畢之后,群主把分配情況列表發(fā)送給群組協(xié)調(diào)器,協(xié)調(diào)器再把這些信息發(fā)送給所有消費(fèi)者。每個消費(fèi)者只能看到自己的分配信息,只有群 主知道群組 里所有消費(fèi)者的分配信息。這個過程會在每次再均衡時重復(fù)發(fā)生。
https://blog.csdn.net/wobuaizhi/article/details/80950387
kafka分區(qū)(partition)和和分組(group)
2018年07月07日 12:37:37 現(xiàn)役碼農(nóng)一個 閱讀數(shù) 6140
版權(quán)聲明:本文為博主原創(chuàng)文章,未經(jīng)博主允許不得轉(zhuǎn)載。 https://blog.csdn.net/wobuaizhi/article/details/80950387
下面是自己在使用過程中的總結(jié)。歡迎拍磚
每個consumer只能消費(fèi)指定的幾個分區(qū)。那么消息如果沒有發(fā)到監(jiān)聽的分區(qū),那么那個消費(fèi)者就不能獲取到這次發(fā)送的消息。
下面的例子一定要注意對分區(qū)和分組的理解,不然會不知道為什么會得出那樣的結(jié)論
消費(fèi)組中的消費(fèi)者會怎么取kafka的數(shù)據(jù),看下方的介紹----來自網(wǎng)絡(luò)
1.其中 broker有兩個,也就是服務(wù)器有兩臺。
2.partition有6個,按照哈希取模的算法分配。
3.消費(fèi)者有8個,他們屬于同一個消費(fèi)組。
那么這一個消費(fèi)組中的消費(fèi)者會怎么取kafka的數(shù)據(jù)呢?
其實kafka的消費(fèi)端有一個均衡算法,算法如下:
1.A=(partition數(shù)量/同分組消費(fèi)者總個數(shù))
2.M=對上面所得到的A值小數(shù)點(diǎn)第一位向上取整
3.計算出該消費(fèi)者拉取數(shù)據(jù)的patition合集:Ci = [P(M*i ),P((i + 1) * M -1)]
A=6/8=0.75
M=1
C0=[P(1*0),P((0+1)1-1)]=[P0,P0]
同理:
C1=[P(11),P((1+1)1-1)]=[P1,P1]
C2=[P(12),P((2+1)1-1)]=[P2,P2]
C3=[P(13),P((3+1)1-1)]=[P3,P3]
C4=[P(14),P((4+1)1-1)]=[P4,P4]
C5=[P(15),P((5+1)1-1)]=[P5,P5]
C6=[P(16),P((6+1)1-1)]=[P6,P6]
C7=[P(17),P((7+1)*1-1)]=[P7,P7]
那么按照上面的算法:
C0消費(fèi)者消費(fèi)P0的數(shù)據(jù)
C1消費(fèi)者消費(fèi)P1的數(shù)據(jù)
C2消費(fèi)者消費(fèi)P2的數(shù)據(jù)
C3消費(fèi)者消費(fèi)P3的數(shù)據(jù)
C4消費(fèi)者消費(fèi)P4的數(shù)據(jù)
C5消費(fèi)者消費(fèi)P5的數(shù)據(jù)
C6消費(fèi)者消費(fèi)P6的數(shù)據(jù)
C7消費(fèi)者消費(fèi)P7的數(shù)據(jù)
但是partition只有P0-P5根本就沒有P6和P7,所以這兩個消費(fèi)者相當(dāng)于是會被閑置的,就相當(dāng)于占用資源,卻沒什么用,所以在這里真正起到作用的就是C0-C5。
如果這個消費(fèi)組里面的消費(fèi)者少于partition數(shù)量呢(比如5個)?
那么還是依葫蘆畫瓢,根據(jù)上面的算法:
A=6/5=1.2
M=2
C0=[P(2*0),P((0+1)2-1)]=[P0,P1]
C1=[P(21),P((1+1)2-1)]=[P2,P3]
C2=[P(22),P((2+1)*2-1)]=[P4,P5]
C3=[P(2*3),P((3+1)2-1)]=[P6,P7]
C4=[P(24),P((4+1)*2-1)]=[P8,P9]
同上面一樣C3和C4沒有起到任何作用。
總結(jié):
1.按照如上的算法,所以如果kafka的消費(fèi)組需要增加組員,最多增加到和partition數(shù)量一致,超過的組員只會占用資源,而不起作用;
2.kafka的partition的個數(shù)一定要大于消費(fèi)組組員的個數(shù),并且partition的個數(shù)對于消費(fèi)組組員取模一定要為0,不然有些消費(fèi)者會占用資源卻不起作用;
3.如果需要增加消費(fèi)組的組員個數(shù),那么也需要根據(jù)上面的算法,調(diào)整partition的個數(shù)
通過上面的介紹可以知道。針對一個分組中分區(qū)和消費(fèi)者實際是有對應(yīng)關(guān)系的。不是說增加了分區(qū),就一定可以增加并發(fā)。具體情況需要多多分析。
注意:消費(fèi)組就是group.id不同。kafka中,同一個topic下,消息會給下面每一個group發(fā)送消息(如果有十個,那個這十個group都會接受到這個消息)。但是分區(qū)每個消息只有一個分區(qū)獲取。
總結(jié)
以上是生活随笔為你收集整理的kafka消费者组概念的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++下ctrl+z退出cin输入循环
- 下一篇: PBG部门培训