kafka协调者
2019獨角獸企業重金招聘Python工程師標準>>>
我們先假設初始時世界是混沌的還沒有盤古的開天辟地,協調者也是一片荒蕪人煙之地,沒有保存任何狀態,因為消費組的初始狀態是Stable,在第一次的Rebalance時,正常的還沒有向消費組注冊過的消費者會執行狀態為Stable而且memberId=UNKNOWN_MEMBER_ID條件分支。在第一次Rebalance之后,每個消費者都分配到了一個成員編號,系統又會進入Stable穩定狀態(Stable穩定狀態包括兩種:一種是沒有任何消費者的穩定狀態,一種是有消費者的穩定狀態)。因為所有消費者在執行一次JoinGroup后并不是說系統就一直保持這種不變的狀態,有可能因為這樣或那樣的事件導致消費者要重新進行JoinGroup,這個時候因為之前JoinGroup過了每個消費者都是有成員編號的,處理方式肯定是不一樣的。
所以定義一種事件驅動的狀態機就很有必要了,這世界看起來是雜亂無章的,不過只要遵循著狀態機的規則(萬物生長的理論),任何事件都是有跡可循有路可走有條不紊地進行著。
?
private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,clientHost: String,sessionTimeoutMs: Int,protocolType: String,protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {//protocolType對于消費者是consumer,注意這里的協議類型和PartitionAssignor協議不同哦//協議類型目前總共就兩種消費者和Worker,而協議是PartitionAssignor分配算法responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {//如果當前組沒有記錄該消費者,而該消費者卻被分配了成員編號,則重置為未知成員,并讓消費者重試responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))} else { group.currentState match {case Dead =>responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))case PreparingRebalance =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二個消費者在這里了!addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)} else {val member = group.get(memberId)updateMemberAndRebalance(group, member, protocols, responseCallback)}case Stable =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //1.初始時第一個消費者在這里!//如果消費者成員編號是未知的,則向GroupMetadata注冊并被記錄下來addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)} else { //3.第二次Rebalance時第一個消費者在這里,此時要分Leader還是普通的消費者了val member = group.get(memberId)if (memberId == group.leaderId || !member.matches(protocols)) {updateMemberAndRebalance(group, member, protocols, responseCallback)} else {responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,generationId = group.generationId,subProtocol = group.protocol,leaderId = group.leaderId,errorCode = Errors.NONE.code))}}}if (group.is(PreparingRebalance))joinPurgatory.checkAndComplete(GroupKey(group.groupId))} }addMemberAndRebalance和updateMemberAndRebalance會創建或更新MemberMetadata,并且會嘗試調用prepareRebalance,消費組中只有一個消費者有機會調用prepareRebalance,并且一旦調用該方法,會將消費組狀態更改為PreparingRebalance,就會使得下一個消費者只能從case PreparingRebalance入口進去了,假設第一個消費者是從Stable進入的,它更改了狀態為PreparingRebalance,下一個消費者就不會從Stable進來的。不過進入Stable狀態還要判斷消費者是不是已經有了成員編號,通常是之前已經發生了Rebalance,這種影響也是比較巨大的,每個消費者走的路徑跟第一次的Rebalance是完全不同的迷宮地圖了。
1)第一次Rebalance如圖6-18的上半部分:
?
圖6-18 第一次和第二次Rebalance
2)第二次Rebalance,對于之前加入過的消費者都要成員編號如圖6-18的下半部分:
3)不過如果有消費者在Leader之前發送又有點不一樣了如圖6-19:
?
圖6-19 Leader非第一個發送JoinGroup請求
4)如果第一個消費者不是Leader,也沒有編號,說明這是一個新增的消費者,流程又不同了如圖6-20:
?
圖6-20 新增消費組第一個發送JoinGroup請求
根據上面的幾種場景總結下來狀態機的規則和一些結論如下:
轉載于:https://my.oschina.net/u/2371517/blog/1142949
總結
- 上一篇: D1net阅闻:IBM宣布推出全新存储技
- 下一篇: 能源结构进入变革时代 光伏业趋于壮大转型