日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka基本知识整理

發布時間:2023/12/4 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka基本知识整理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

首先Kafka是一個分布式消息隊列中間件,Apache頂級項目,https://kafka.apache.org/? ?高性能、持久化、多副本備份、橫向擴展。

生產者Producer往隊列里發送消息,消費者Consumer從隊列里消費消息,然后進行業務邏輯。應用場景主要有:解耦、削峰(緩沖)、異步處理、排隊、分布式事務控制等等。

  • Kafka對外使用Topic(主題)的概念,生產者往Topic里寫消息,消費者從Topic中消費讀消息。

  • 為了實現水平擴展,一個Topic實際是由多個Partition(分區)組成的,遇到瓶頸時,可以通過增加Partition的數量來進行橫向擴容。

  • 單個Parition內是保證消息有序。持久化時,每收到一條消息,Kafka就是在對應的日志文件Append寫,所以性能非常高。

  • Kafka Data Flow 消息流轉圖

    上圖中,消息生產者Producers往Brokers里面的指定Topic中寫消息,消息消費者Consumers從Brokers里面消費指定Topic的消息,然后進行業務處理。

    在實際的部署架構中,Broker、Topic、Partition這些元數據保存在ZooKeeper中,Kafka的監控、消息路由(分區)由ZooKeeper控制。0.8版本的OffSet也由ZooKeeper控制。

    一、消息生產/發送過程

    Kafka創建Message、發送時要指定對應的Topic和Value(消息體),Key(分區鍵)和Partition(分區)是可選參數。?

    調用Producer的Send()方法后,消息先進行序列化(消息序列化器可自定義實現:例如:Protobuf),然后按照Topic和Partition,臨時放到內存中指定的發送隊列中。達到閾值后,然后批量發送。

    發送時,當Partition沒設置時,如果設置了Key-分區鍵(例如:單據類型),按照Key進行Hash取模,保證相同的Key發送到指定的分區Partition。如果未設置分區鍵Key,使用Round-Robin輪詢隨機選分區Partition。

    二、分區Partition的高可用和選舉機制

    分區有副本的概念,保證消息不丟失。當存在多副本的情況下,會盡量把多個副本,分配到不同的broker上。

    Kafka會為Partition選出一個Leader Broker(通過ZooKeeper),之后所有該Partition的請求,實際操作的都是Leader,然后再同步到其他的Follower。

    當一個Kafka Broker宕機后,所有Leader在該Broker上的Partition都會重新選舉,在剩余的Follower中選出一個Leader,繼續提供服務。

    正如上面所講:Kafka使用ZooKeeper在多個Broker中選出一個Controller,用于Partition分配和Leader選舉。以下是Partition的分配機制:

    • 將所有Broker(假設共n個Broker)和待分配的Partition排序

    • 將第i個Partition分配到第(i mod n)個Broker上 (這個就是leader)

    • 將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上

    Controller會在ZooKeeper的/brokers/ids節點上注冊Watch,一旦有broker宕機,它就能知道。

    當Broker宕機后,Controller就會給受到影響的Partition選出新Leader。

    Controller從ZooKeeper的/brokers/topics/[topic]/partitions/[partition]/state中,讀取對應Partition的ISR(in-sync replica已同步的副本)列表,選一個出來做Leader。

    選出Leader后,更新ZooKeeper的存儲,然后發送LeaderAndISRRequest給受影響的Broker進行通知。

    如果ISR列表是空,那么會根據配置,隨便選一個replica做Leader,或者干脆這個partition就是宕機了。

    如果ISR列表的有機器,但是也宕機了,那么還可以等ISR的機器活過來。

    多副本同步:

    服務端這邊的處理是Follower從Leader批量拉取數據來同步。但是具體的可靠性,是由生產者來決定的。

    生產者生產消息的時候,通過request.required.acks參數來設置數據的可靠性。

    ?在acks=-1的時候,如果ISR少于min.insync.replicas指定的數目,那么就會返回不可用。

    ?這里ISR列表中的機器是會變化的,根據配置replica.lag.time.max.ms,多久沒同步,就會從ISR列表中剔除。以前還有根據落后多少條消息就踢出ISR,在1.0版本后就去掉了,因為這個值很難取,在高峰的時候很容易出現節點不斷的進出ISR列表。??

    ?從ISA中選出leader后,follower會從把自己日志中上一個高水位后面的記錄去掉,然后去和leader拿新的數據。因為新的leader選出來后,follower上面的數據,可能比新leader多,所以要截取。這里高水位的意思,對于partition和leader,就是所有ISR中都有的最新一條記錄。消費者最多只能讀到高水位;

    ?從leader的角度來說高水位的更新會延遲一輪,例如寫入了一條新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一輪的fetch中才能告訴leader。

    ?也正是由于這個高水位延遲一輪,在一些情況下,kafka會出現丟數據和主備數據不一致的情況,0.11開始,使用leader epoch來代替高水位。

    三、消息消費過程

    訂閱topic是以一個消費組來訂閱的,一個消費組里面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。

    換句話來說,就是一個partition,只能被消費組里的一個消費者消費,但是可以同時被多個消費組消費。因此,如果消費組內的消費者如果比partition多的話,那么就會有個別消費者一直空閑。

    ?消息Offset偏移量(消息的順序號)管理

    ?一個消費組消費partition,需要保存offset記錄消費到哪,以前保存在zk中,由于zk的寫性能不好,以前的解決方法都是consumer每隔一分鐘上報一次。

    ?ZooKeeper的性能嚴重影響了消費的速度,而且很容易出現重復消費。

    ?在0.10版本后,Kafka把這個offset的保存,從ZooKeeper總剝離,保存在一個名叫__consumeroffsets topic的Topic中。

    ?消息的key由groupid、topic、partition組成,value是偏移量offset。topic配置的清理策略是compact。總是保留最新的key,其余刪掉。

    ?一般情況下,每個key的offset都是緩存在內存中,查詢的時候不用遍歷partition,如果沒有緩存,第一次就會遍歷partition建立緩存,然后查詢返回。

    ?Partitin的Rebalance

    ?生產過程中broker要分配partition,消費過程這里,也要分配partition給消費者。類似broker中選了一個controller出來,消費也要從broker中選一個coordinator,用于分配partition。

    ?coordinator的選舉過程

  • 看offset保存在那個partition

  • 該partition leader所在的broker就是被選定的coordinator

  • ?交互流程?

  • consumer啟動、或者coordinator宕機了,consumer會任意請求一個broker,發送ConsumerMetadataRequest請求,broker會按照上面說的方法,選出這個consumer對應coordinator的地址。

  • consumer 發送heartbeat請求給coordinator,返回IllegalGeneration的話,就說明consumer的信息是舊的了,需要重新加入進來,進行reblance。返回成功,那么consumer就從上次分配的partition中繼續執行。

  • ? Rebalance

  • consumer給coordinator發送JoinGroupRequest請求。

  • 這時其他consumer發heartbeat請求過來時,coordinator會告訴他們,要reblance了。

  • 其他consumer發送JoinGroupRequest請求。

  • 所有記錄在冊的consumer都發了JoinGroupRequest請求之后,coordinator就會在這里consumer中隨便選一個leader。然后回JoinGroupRespone,這會告訴consumer你是follower還是leader,對于leader,還會把follower的信息帶給它,讓它根據這些信息去分配partition

  • consumer向coordinator發送SyncGroupRequest,其中leader的SyncGroupRequest會包含分配的情況。

  • coordinator回包,把分配的情況告訴consumer,包括leader。

  • ? ?當partition或者消費者的數量發生變化時,都得進行reblance。

    ? ?列舉一下會reblance的情況:

  • 增加Partition

  • 增加消費者

  • 消費者主動關閉

  • 消費者宕機

  • coordinator宕機

  • 四、消息投遞語義

    kafka支持3種消息投遞語義,
    At most once:最多一次,消息可能會丟失,但不會重復
    At least once:最少一次,消息不會丟失,可能會重復
    Exactly once:只且一次,消息不丟失不重復,只且消費一次(0.11中實現,僅限于下游也是kafka)

    At least once:(業務中使用比較多)

    先獲取數據,再進行業務處理,業務處理成功后commit offset。

  • 生產者生產消息異常,消息是否成功寫入不確定,重做,可能寫入重復的消息

  • 消費者處理消息,業務處理成功后,更新offset失敗,消費者重啟的話,會重復消費

  • At most once:

    先獲取數據,再commit offset,最后進行業務處理。

  • 生產者生產消息異常,不管,生產下一個消息,消息就丟了

  • 消費者處理消息,先更新offset,再做業務處理,做業務處理失敗,消費者重啟,消息就丟了。

  • Exactly once:

    首先要保證消息不丟,再去保證不重復。所以盯著At least once的原因來搞。?

  • 生產者重做導致重復寫入消息----生產保證冪等性

  • 消費者重復消費---消滅重復消費,或者業務接口保證冪等性重復消費也沒問題

  • 業務處理的冪等性非常重要。Kafka控制不了,需要業務來實現。比如所判斷消息是否已經處理。

    解決重復消費有兩個方法:

  • 下游系統保證冪等性,重復消費也不會導致多條記錄。

  • 把commit offset和業務處理綁定成一個事務。

  • 生產的冪等性:

    為每個producer分配一個pid,作為該producer的唯一標識。producer會為每一個<topic,partition>維護一個單調遞增的seq。類似的,broker也會為每個<pid,topic,partition>記錄下最新的seq。當req_seq == broker_seq+1時,broker才會接受該消息。因為:

  • 消息的seq比broker的seq大超過時,說明中間有數據還沒寫入,即亂序了。

  • 消息的seq不比broker的seq小,那么說明該消息已被保存。

  • ? 事務性和原子性

    ? ?場景是這樣的:

  • 先從多個源topic中獲取數據。

  • 做業務處理,寫到下游的多個目的topic。

  • 更新多個源topic的offset。

  • ? ?其中第2、3點作為一個事務,要么全成功,要么全失敗。這里得益與offset實際上是用特殊的topic去保存,這兩點都歸一為寫多個topic的事務性處理。

    ? ?

    ? ?引入tid(transaction id),和pid不同,這個id是應用程序提供的,用于標識事務,和producer是誰并沒關系。就是任何producer都可以使用這個tid去做事務,這樣進行到一半就死掉的事務,可以由另一個producer去恢復。
    ? ?同時為了記錄事務的狀態,類似對offset的處理,引入transaction coordinator用于記錄transaction log。在集群中會有多個transaction coordinator,每個tid對應唯一一個transaction coordinator。
    ? ?注:transaction log刪除策略是compact,已完成的事務會標記成null,compact后不保留。
    ? ?啟動事務時,先標記開啟事務,寫入數據,全部成功就在transaction log中記錄為prepare commit狀態,否則寫入prepare abort的狀態。之后再去給每個相關的partition寫入一條marker(commit或者abort)消息,標記這個事務的message可以被讀取或已經廢棄。成功后? ? ?在transaction log記錄下commit/abort狀態,至此事務結束。

    ? ?整體的數據流是這樣的:

    ? ?

  • 首先使用tid請求任意一個broker(代碼中寫的是負載最小的broker),找到對應的transaction coordinator。

  • 請求transaction coordinator獲取到對應的pid,和pid對應的epoch,這個epoch用于防止僵死進程復活導致消息錯亂,當消息的epoch比當前維護的epoch小時,拒絕掉。tid和pid有一一對應的關系,這樣對于同一個tid會返回相同的pid。

  • client先請求transaction coordinator記錄<topic,partition>的事務狀態,初始狀態是BEGIN,如果是該事務中第一個到達的<topic,partition>,同時會對事務進行計時;client輸出數據到相關的partition中;client再請求transaction coordinator記錄offset的<topic,partition>事務狀態;client發送offset commit到對應offset partition。

  • client發送commit請求,transaction coordinator記錄prepare commit/abort,然后發送marker給相關的partition。全部成功后,記錄commit/abort的狀態,最后這個記錄不需要等待其他replica的ack,因為prepare不丟就能保證最終的正確性了。

  • ? ? ?這里prepare的狀態主要是用于事務恢復,例如給相關的partition發送控制消息,沒發完就宕機了,備機起來后,producer發送請求獲取pid時,會把未完成的事務接著完成。

    ? ? ?當partition中寫入commit的marker后,相關的消息就可被讀取。所以kafka事務在prepare commit到commit這個時間段內,消息是逐漸可見的,而不是同一時刻可見。

    ? ? 消息消費事務

    ? ? 消費時,partition中會存在一些消息處于未commit狀態,即業務方應該看不到的消息,需要過濾這些消息不讓業務看到,kafka選擇在消費者進程中進行過來,而不是在broker中過濾,主要考慮的還是性能。kafka高性能的一個關鍵點是zero copy,如果需要在broker中過 濾,那么勢必需要讀取消息內容到內存,就會失去zero copy的特性。


    ? 五、 Kafka文件組織

    ??kafka的數據,實際上是以文件的形式存儲在文件系統的。topic下有partition,partition下有segment,

    segment是實際的一個個文件

    ,topic和partition都是抽象概念。

    ? 在目錄/${topicName}-{$partitionid}/下,存儲著實際的log文件(即segment),還有對應的索引文件。

    ??每個segment文件大小相等,文件名以這個segment中最小的offset命名,文件擴展名是.log;segment對應的索引的文件名字一樣,擴展名是.index。有兩個index文件,一個是offset index用于按offset去查message,一個是time index用于按照時間去查,其實這里可以優化合到一起,下面只說offset index。總體的組織是這樣的:

    ? 為了減少索引文件的大小,降低空間使用,方便直接加載進內存中,這里的索引使用稀疏矩陣,不會每一個message都記錄下具體位置,而是每隔一定的字節數,再建立一條索引。 索引包含兩部分,分別是baseOffset,還有position。

    ? baseOffset:意思是這條索引對應segment文件中的第幾條message。這樣做方便使用數值壓縮算法來節省空間。例如kafka使用的是varint。

    ? position:在segment中的絕對位置。

    ? 查找offset對應的記錄時,會先用二分法,找出對應的offset在哪個segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍歷查找message。

    六、Kafka常用配置項

    ? Broker配置

    ? Topic配置

    ? 參考鏈接:123archu?鏈接:https://www.jianshu.com/p/d3e963ff8b70?

    原文地址:https://www.cnblogs.com/tianqing/p/10808717.html

    .NET社區新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?

    總結

    以上是生活随笔為你收集整理的Kafka基本知识整理的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。