Consumer设计-high/low Level Consumer
1 Producer和Consumer的數(shù)據(jù)推送拉取方式
??Producer Producer通過主動(dòng)Push的方式將消息發(fā)布到Broker n Consumer Consumer通過Pull從Broker消費(fèi)數(shù)據(jù)
? Push? 優(yōu)勢(shì):延時(shí)低
? ? ? ? ? 劣勢(shì):可能造成Consumer來不及處理消息;網(wǎng)絡(luò)擁塞?
? Pull? ?優(yōu)勢(shì):Consumer按實(shí)際處理能力獲取相應(yīng)量的數(shù)據(jù);Broker實(shí)現(xiàn)簡單
? ? ? ? ? 劣勢(shì):如果處理不好,實(shí)時(shí)性相對(duì)不足(例如需要大量不斷請(qǐng)求浪費(fèi)資源,Kafka使用long polling,一次請(qǐng)求無果等待一段時(shí)間從而減少請(qǐng)求次數(shù))。
2? High Level Consumer
? ?場景:客戶程序只是希望從Kafka順序讀取并處理數(shù)據(jù),而不太關(guān)心具體的offset。?
? ? ? ? ? ? 也希望提供一些語義,例如同一條消息只被某一個(gè)Consumer消費(fèi)(單播)或被所有Consumer消費(fèi)(廣播)。
? ? ? Kafka High Level API提供了一個(gè)從Kafka消費(fèi)數(shù)據(jù)的高層抽象,從而屏蔽掉其中的細(xì)節(jié),并提供豐富的語義。
?(1)Consumer Group? ?理解consumer group記住下面這三個(gè)特性就好了:consumer group下可以有一個(gè)或多個(gè)consumer instance,consumer instance可以是一個(gè)進(jìn)程,也可以是一個(gè)線程;group.id是一個(gè)字符串,唯一標(biāo)識(shí)一個(gè)consumer group;consumer group下訂閱的topic下的每個(gè)分區(qū)只能分配給某個(gè)group下的一個(gè)consumer(當(dāng)然該分區(qū)還可以被分配給其他group);
? ? ? ? High Level Consumer將從某個(gè)Partition讀取的最后一條 消息的offset存于Zookeeper中。
? ? ? ? 這個(gè)offset基于客戶程序提供給Kafka的名字來保存,這個(gè) 名字被稱為Consumer Group。
? ? ? ? Consumer Group是整個(gè)Kafka集群全局唯一的,而非針對(duì)某個(gè)Topic。
? ? ? ? 每個(gè)High Level Consumer實(shí)例都屬于一個(gè)Consumer Group,若不指定則屬于默認(rèn)的Group。
? ? ? ? ? ? ? ? ??
??
? ? ? ? ? ??
? ? ? ? ? 很多傳統(tǒng)的Message Queue都會(huì)在消息被消費(fèi)完后將消息刪除,一方面避免重復(fù)消費(fèi),另一方面可以保證Queue的長度比較短,提高效率。kafka會(huì)采用兩種,
刪除(過期或過大)和壓縮,壓縮如下。
?
*消息被消費(fèi)后,并不會(huì)被刪除,只是相應(yīng)的offset加一。
? ? ? ? ? ? *對(duì)于每條消息,在同一個(gè)Consumer Group里只會(huì)被一個(gè)Consumer消費(fèi)
? ? ? ? ? ? *不同Consumer Group可消費(fèi)同一條消息 。
? ? (2)High Level Consumer Rebalance?
? ? ? ?Kafka保證同一Consumer Group中只有一個(gè)Consumer會(huì)消費(fèi)某條消息,實(shí)際上,Kafka保證的是穩(wěn)定狀態(tài)下每一個(gè)Consumer實(shí)例只會(huì)消費(fèi)某一個(gè)或多個(gè)特定Partition的數(shù)據(jù),而某個(gè)Partition的數(shù)據(jù)只會(huì)被某一個(gè)特定的Consumer實(shí)例所消費(fèi)。也就是說Kafka對(duì)消息的分配是以Partition為單位分配的,而非以每一條消息作為分配單元。這樣設(shè)計(jì)的劣勢(shì)是無法保證同一個(gè)Consumer Group里的Consumer均勻消費(fèi)數(shù)據(jù),優(yōu)勢(shì)是每個(gè)Consumer不用都跟大量的Broker通信,減少通信開銷,同時(shí)也降低了分配難度,實(shí)現(xiàn)也更簡單。另外,因?yàn)橥粋€(gè)Partition里的數(shù)據(jù)是有序的,這種設(shè)計(jì)可以保證每個(gè)Partition里的數(shù)據(jù)可以被有序消費(fèi)。
如果某Consumer Group中Consumer(每個(gè)Consumer只創(chuàng)建1個(gè)MessageStream)數(shù)量少于Partition數(shù)量,則至少有一個(gè)Consumer會(huì)消費(fèi)多個(gè)Partition的數(shù)據(jù),如果Consumer的數(shù)量與Partition數(shù)量相同,則正好一個(gè)Consumer消費(fèi)一個(gè)Partition的數(shù)據(jù)。而如果Consumer的數(shù)量多于Partition的數(shù)量時(shí),會(huì)有部分Consumer無法消費(fèi)該Topic下任何一條消息。
Consumer Rebalance算法
將目標(biāo)Topic下的所有Partirtion排序,存于PT
對(duì)某Consumer Group下所有Consumer排序,存于CG,
第i個(gè)Consumer記為Ci
? ? ? N=size(PT)/size(CG) ,向上取整
解除Ci對(duì)原來分配的Partition的消費(fèi)權(quán)(i從0開始)
? ?將第 i?N 到(i+1)?N?1個(gè)Partition分配給Ci
? ? ? 在這種策略下,每一個(gè)Consumer或者Broker的增加或者減少都會(huì)觸發(fā)Consumer Rebalance。因?yàn)槊總€(gè)Consumer只負(fù)責(zé)調(diào)整自己所消費(fèi)的Partition,為了保證整個(gè)Consumer Group的一致性,當(dāng)一個(gè)Consumer觸發(fā)了Rebalance時(shí),該Consumer Group內(nèi)的其它所有其它Consumer也應(yīng)該同時(shí)觸發(fā)Rebalance。因此有以下缺點(diǎn)
Herd effect:任何Broker或者Consumer的增減都會(huì)觸發(fā)所有的Consumer的Rebalance
Split Brain:每個(gè)Consumer分別單獨(dú)通過Zookeeper判斷哪些Broker和Consumer 宕機(jī)了,那么不同Consumer在同一時(shí)刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的,這就會(huì)造成不正確的Reblance嘗試。
調(diào)整結(jié)果不可控:所有的Consumer都并不知道其它Consumer的Rebalance是否成功,這可能會(huì)導(dǎo)致Kafka工作在一個(gè)不正確的狀態(tài)。
? ? ? 0.9以后的版本,提供了coordinator來解決上述缺點(diǎn)。
3? ?coordinator? ? 和Rebalance
? ??新consumer加入組、已有consumer主動(dòng)離開組或已有consumer崩潰的時(shí)候,會(huì)觸發(fā)rebalance。每個(gè)consumer group都會(huì)被分配一個(gè)這樣的coordinator用于組管理和位移管理。這個(gè)group coordinator比原來承擔(dān)了更多的責(zé)任,比如組成員管理、位移提交保護(hù)機(jī)制等。當(dāng)新版本consumer group的第一個(gè)consumer啟動(dòng)的時(shí)候,它會(huì)去和kafka server確定誰是它們組的coordinator。之后該group內(nèi)的所有成員都會(huì)和該coordinator進(jìn)行協(xié)調(diào)通信。這種coordinator設(shè)計(jì)不再需要zookeeper了,性能上可以得到很大的提升。
? ? * generation:它表示了rebalance之后的一代成員,主要是用于保護(hù)consumer group,隔離無效offset提交的。比如上一代的consumer成員是無法提交位移到新一屆的consumer group中。有時(shí)候報(bào)ILLEGAL_GENERATION的錯(cuò)誤就是代錯(cuò)誤。每次group進(jìn)行rebalance之后,generation號(hào)都會(huì)加1,表示group進(jìn)入到了一個(gè)新的版本,如下圖所示: Generation 1時(shí)group有3個(gè)成員,隨后成員2退出組,coordinator觸發(fā)rebalance,consumer group進(jìn)入Generation 2,之后成員4加入,再次觸發(fā)rebalance,group進(jìn)入Generation 3。
? ??
? ? ?* 協(xié)議 :rebalance本質(zhì)上是一組協(xié)議。group與coordinator共同使用它來完成group的rebalance。目前kafka提供了5個(gè)協(xié)議來處理與consumer group coordination相關(guān)的問題。? ?
Heartbeat請(qǐng)求:consumer需要定期給coordinator發(fā)送心跳來表明自己還活著
LeaveGroup請(qǐng)求:主動(dòng)告訴coordinator我要離開consumer group
SyncGroup請(qǐng)求:group leader把分配方案告訴組內(nèi)所有成員
JoinGroup請(qǐng)求:成員請(qǐng)求加入組
DescribeGroup請(qǐng)求:顯示組的所有信息,包括成員信息,協(xié)議名稱,分配方案,訂閱信息等。通常該請(qǐng)求是給管理員使用。
? ?* 狀態(tài):和很多kafka組件一樣,group也做了個(gè)狀態(tài)機(jī)來表明組狀態(tài)的流轉(zhuǎn)。coordinator根據(jù)這個(gè)狀態(tài)機(jī)會(huì)對(duì)consumer group做不同的處理,如下
? ? ?
Dead:組內(nèi)已經(jīng)沒有任何成員的最終狀態(tài),組的元數(shù)據(jù)也已經(jīng)被coordinator移除了。這種狀態(tài)響應(yīng)各種請(qǐng)求都是一個(gè)response: UNKNOWN_MEMBER_ID
Empty:組內(nèi)無成員,但是位移信息還沒有過期。這種狀態(tài)只能響應(yīng)JoinGroup請(qǐng)求
PreparingRebalance:組準(zhǔn)備開啟新的rebalance,等待成員加入
AwaitingSync:正在等待leader consumer將分配方案傳給各個(gè)成員
Stable:rebalance完成可以開始消費(fèi)
? ?* 過程 :加入,移除,崩潰幾種圖如下
1 Join, 顧名思義就是加入組。這一步中,所有成員都向coordinator發(fā)送JoinGroup請(qǐng)求,請(qǐng)求入組。一旦所有成員都發(fā)送了JoinGroup請(qǐng)求,coordinator會(huì)從中選擇一個(gè)consumer擔(dān)任leader的角色,并把組成員信息以及訂閱信息發(fā)給leader——注意leader和coordinator不是一個(gè)概念。leader負(fù)責(zé)消費(fèi)分配方案的制定。
2 Sync,這一步leader開始分配消費(fèi)方案,即哪個(gè)consumer負(fù)責(zé)消費(fèi)哪些topic的哪些partition。一旦完成分配,leader會(huì)將這個(gè)方案封裝進(jìn)SyncGroup請(qǐng)求中發(fā)給coordinator,非leader也會(huì)發(fā)SyncGroup請(qǐng)求,只是內(nèi)容為空。coordinator接收到分配方案之后會(huì)把方案塞進(jìn)SyncGroup的response中發(fā)給各個(gè)consumer。這樣組內(nèi)的所有成員就都知道自己應(yīng)該消費(fèi)哪些分區(qū)了。
? ? ? 新增consumer:
? ? ? ?
? ? 移除consumer:
? ?
? ?consumer崩掉:
? ? ?
?
? ? 參考:https://www.cnblogs.com/byrhuangqiang/p/6384986.html,https://www.cnblogs.com/huxi2b/p/6223228.html,http://www.jasongj.com/2015/08/09/KafkaColumn4/
4 low consumer
使用Low Level Consumer (Simple Consumer)的主要原因是,用戶希望比Consumer Group更好的控制數(shù)據(jù)的消費(fèi), 如
? ? ?*同一條消息讀多次,方便Replay
? ? ?*只消費(fèi)某個(gè)Topic的部分Partition
? ? ?*管理事務(wù),從而確保每條消息被處理一次(Exactly once)
? ? ?*與High Level Consumer相對(duì),Low Level Consumer要求用戶做大量的額外工作
? ? ?*在應(yīng)用程序中跟蹤處理offset,并決定下一條消費(fèi)哪條消息
? ? ?*獲知每個(gè)Partition的Leader
? ? ?*處理Leader的變化
? ? ?*處理多Consumer的協(xié)作
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/lkdirk/p/8645755.html
總結(jié)
以上是生活随笔為你收集整理的Consumer设计-high/low Level Consumer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【页面传值6种方式】- 【JSP 页面传
- 下一篇: webservice入门程序学习中经验总