日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka协调者

發布時間:2025/4/5 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka协调者 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

我們先假設初始時世界是混沌的還沒有盤古的開天辟地,協調者也是一片荒蕪人煙之地,沒有保存任何狀態,因為消費組的初始狀態是Stable,在第一次的Rebalance時,正常的還沒有向消費組注冊過的消費者會執行狀態為Stable而且memberId=UNKNOWN_MEMBER_ID條件分支。在第一次Rebalance之后,每個消費者都分配到了一個成員編號,系統又會進入Stable穩定狀態(Stable穩定狀態包括兩種:一種是沒有任何消費者的穩定狀態,一種是有消費者的穩定狀態)。因為所有消費者在執行一次JoinGroup后并不是說系統就一直保持這種不變的狀態,有可能因為這樣或那樣的事件導致消費者要重新進行JoinGroup,這個時候因為之前JoinGroup過了每個消費者都是有成員編號的,處理方式肯定是不一樣的。

所以定義一種事件驅動的狀態機就很有必要了,這世界看起來是雜亂無章的,不過只要遵循著狀態機的規則(萬物生長的理論),任何事件都是有跡可循有路可走有條不紊地進行著。

?

private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,clientHost: String,sessionTimeoutMs: Int,protocolType: String,protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {//protocolType對于消費者是consumer,注意這里的協議類型和PartitionAssignor協議不同哦//協議類型目前總共就兩種消費者和Worker,而協議是PartitionAssignor分配算法responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {//如果當前組沒有記錄該消費者,而該消費者卻被分配了成員編號,則重置為未知成員,并讓消費者重試responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))} else { group.currentState match {case Dead =>responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))case PreparingRebalance =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二個消費者在這里了!addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)} else {val member = group.get(memberId)updateMemberAndRebalance(group, member, protocols, responseCallback)}case Stable =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //1.初始時第一個消費者在這里!//如果消費者成員編號是未知的,則向GroupMetadata注冊并被記錄下來addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)} else { //3.第二次Rebalance時第一個消費者在這里,此時要分Leader還是普通的消費者了val member = group.get(memberId)if (memberId == group.leaderId || !member.matches(protocols)) {updateMemberAndRebalance(group, member, protocols, responseCallback)} else {responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,generationId = group.generationId,subProtocol = group.protocol,leaderId = group.leaderId,errorCode = Errors.NONE.code))}}}if (group.is(PreparingRebalance))joinPurgatory.checkAndComplete(GroupKey(group.groupId))} }

addMemberAndRebalance和updateMemberAndRebalance會創建或更新MemberMetadata,并且會嘗試調用prepareRebalance,消費組中只有一個消費者有機會調用prepareRebalance,并且一旦調用該方法,會將消費組狀態更改為PreparingRebalance,就會使得下一個消費者只能從case PreparingRebalance入口進去了,假設第一個消費者是從Stable進入的,它更改了狀態為PreparingRebalance,下一個消費者就不會從Stable進來的。不過進入Stable狀態還要判斷消費者是不是已經有了成員編號,通常是之前已經發生了Rebalance,這種影響也是比較巨大的,每個消費者走的路徑跟第一次的Rebalance是完全不同的迷宮地圖了。

1)第一次Rebalance如圖6-18的上半部分:

  • 第一個消費者,狀態為Stable,沒有編號,addMemberAndRebalance,成為Leader,執行prepareRebalance,更改狀態為PreparingRebalance,創建DelayedJoin
  • 第二個消費者,狀態為PreparingRebalance,沒有編號,addMemberAndRebalance(不執行prepareRebalance,因為在狀態改變成PreparingRebalance后就不會被執行了);后面的消費者同第二個
  • 所有消費者都要等協調者收集完所有成員編號在DelayedJoin完成時才會收到JoinGroup響應
  • ?

    圖6-18 第一次和第二次Rebalance

    2)第二次Rebalance,對于之前加入過的消費者都要成員編號如圖6-18的下半部分:

  • 第一個消費者是Leader,狀態為Stable,有編號,updateMemberAndRebalance,更改狀態為PreparingRebalance,創建DelayedJoin
  • 第二個消費者,狀態為PreparingRebalance,有編號,updateMemberAndRebalance;后面的消費者同第二個
  • 所有消費者也要等待,因為其他消費者發送Join請求在Leader消費者之后。
  • 3)不過如果有消費者在Leader之前發送又有點不一樣了如圖6-19:

  • 第一個消費者不是Leader,狀態為Stable,有編號,responseCallback,立即收到JoinGroup響應,好幸運啊!
  • 第二個消費者如果也不是Leader,恭喜你,協調者也放過他,直接返回JoinGroup響應
  • 第三個消費者是Leader(領導來了),狀態為Stable(什么,你們之前的消費者竟然都沒更新狀態!,因為他們都沒有add或update),有編號,updateMemberAndRebalance(還是我第一個調用add或update,看來還是只能我來更新狀態),更改狀態為PreparingRebalance,創建DelayedJoin
  • 第四個消費者不是Leader,狀態為PreparingRebalance,有編號,updateMemberAndRebalance(前面有領導,不好意思了,不能立即返回JoinGroup給你了,你們這些剩下的消費者都只能和領導一起返回了,算你們倒霉)
  • ?

    圖6-19 Leader非第一個發送JoinGroup請求

    4)如果第一個消費者不是Leader,也沒有編號,說明這是一個新增的消費者,流程又不同了如圖6-20:

  • 第一個消費者不是Leader,狀態為Stable,沒有編號,addMemberAndRebalance,執行prepareRebalance(我是第一個調用add或update的哦,你們都別想跟我搶這個頭彩了),更改狀態為PreparingRebalance(我不是Leader但我驕傲啊),創建DelayedJoin(我搶到頭彩,當然創建DelayedJoin的工作只能由我來完成了)
  • 第二個消費者也不是Leader,恭喜你,協調者也放過他,直接返回JoinGroup響應
  • 第三個消費者是Leader(領導來了),狀態為PreparingRebalance(有個新來的不懂規矩,他已經把狀態改了),有編號,updateMemberAndRebalance(有人已經改了,你老就不用費心思了),凡是沒有立即返回響應的,都需要等待,領導也不例外
  • 第四個消費者不是Leader(廢話,只有一個領導,而且領導已經在前面了),不會立即返回響應(你看領導都排隊呢)
  • 雖然DelayedJoin是由沒有編號的消費者創建,不過由于DelayedJoin是以消費組為級別的,所以不用擔心,上一次選舉出來的領導還是領導,協調者最終還是會把members交給領導,不會是給那個沒有編號的消費者的,雖然說在他注冊的時候已經有編號了,但是大家不認啊。不過領導其實不在意是誰開始觸發prepareRebalance的,那個人要負責生成DelayedJoin,而不管是領導自己還是其他人一旦更改狀態為PreparingRebalance,后面的消費者都要等待DelayedJoin完成了,而領導者總是要等待的,所以他當然無所謂了,因為他知道最后協調者總是會把members交給他的。
  • ?

    圖6-20 新增消費組第一個發送JoinGroup請求

    根據上面的幾種場景總結下來狀態機的規則和一些結論如下:

  • 第一個調用addMemberAndRebalance或者updateMemberAndRebalance的會將狀態改為PreparingRebalance,并且負責生成DelayedJoin
  • 一旦狀態進入PreparingRebalance,其他消費者就只能從PreparingRebalance狀態入口進入,這里只有兩種選擇addMemberAndRebalance或者updateMemberAndRebalance,不過他們不會更改狀態,也不會生成DelayedJoin
  • 發生DelayedJoin之后,其他消費者的JoinGroup響應都會被延遲,因為如規則2中,他們只能調用add或update,無法立即調用responseCallback,所以就要和DelayedJoin的那個消費者一起等待
  • 正常流程時,發生responseCallback的是存在成員編號的消費者在Leader之前發送了JoinGroup,或者新增加的消費者發送了JoinGroup請求之前
  • 第一次Rebalance時,第一個消費者會創建DelayedJoin,之后的Rebalance,只有新增的消費者才有機會創建(如果他在Leader之前發送的話,如果在Leader之后就沒有機會了),而普通消費者總是沒有機會創建DelayedJoin的,因為狀態為Stable時,他會直接開溜,有人(Leader或者新增加的消費者)創建了DelayedJoin之后,他又在那邊怨天尤人只能等待
  • 轉載于:https://my.oschina.net/u/2371517/blog/1142949

    總結

    以上是生活随笔為你收集整理的kafka协调者的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 免费国产黄| 中文不卡av| 丁香六月av | 青青草草视频 | 国产鲁鲁| 国产一二三在线观看 | 天干夜夜爽爽日日日日 | 久久成人一区 | 日本五十路在线 | 国产精品福利片 | 日韩不卡免费视频 | 成人精品视频一区二区 | av网站免费播放 | 窝窝午夜影院 | 黄色a级片视频 | 久久久综合网 | 丰满少妇乱子伦精品看片 | 国产特黄级aaaaa片免 | 影音先锋在线看 | 性一交一乱一精一晶 | 91网站在线播放 | 97成人资源 | 欧美激情午夜 | 一级免费看 | 国产精品成人久久电影 | 色综合中文综合网 | 少妇2做爰bd在线意大利堕落 | 女人又爽又黄免费女仆 | 久久亚洲日本 | 亚洲韩国精品 | 色丁香在线 | 中文字幕激情视频 | 国产对白刺激视频 | 亚洲国产婷婷香蕉久久久久久99 | 欧美日本韩国在线 | 禁断介护av一区二区 | 欧美性受黑人性爽 | 成人午夜免费视频 | 日本性猛交 | 日韩欧美中出 | 久久久久久久久久久91 | 国精品无码一区二区三区 | 一区二区三区四区国产 | www国产| 又粗又猛又爽又黄的视频 | 不卡视频国产 | 亚洲一区中文字幕在线 | 日本特级黄色大片 | 午夜视频黄色 | 久久久91精品国产一区二区三区 | 国产成人精品一区二区三区福利 | 国产福利视频在线观看 | 在线观看中文字幕第一页 | 激情av一区| 国产吞精囗交免费视频网站 | 日本高清久久 | 99热r | av鲁丝一区鲁丝二区鲁丝三区 | 深夜福利国产精品 | 海量av| 成人免费毛片果冻 | 超碰人人91 | 中文字幕一区二区三区人妻四季 | 日本女人一区二区三区 | 夜夜操夜夜 | 善良的女邻居在线观看 | 天天天操| 国内老熟妇对白hdxxxx | 久久毛片网 | 免费的三级网站 | 欧美日韩激情网 | 三级欧美日韩 | 欧美在线播放视频 | 欧美色狠| 高清国产在线 | 日韩经典一区 | 日本黄页网站免费大全 | 看全黄大色黄大片 | 毛片88 | 色先锋av| 国产精品久久国产精麻豆96堂 | 激情福利社 | 麻豆国产精品视频 | 91麻豆精品国产91久久久更新时间 | 少妇高潮久久久久久潘金莲 | 欧美一级特黄视频 | 久久天天躁狠狠躁夜夜躁 | 综合激情网 | 亚洲高清视频网站 | 欧美成人精品在线视频 | 狼色网 | 国产精品久久久国产盗摄 | 国产一在线 | 大尺度av | 深爱激情av | 久久久网站| 毛片天天看 | 国语对白做受按摩的注意事项 | 天天做天天爽 |