日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

最全Kafka 设计与原理详解【2017.9全新】

發(fā)布時間:2023/12/15 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 最全Kafka 设计与原理详解【2017.9全新】 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、Kafka簡介

1.1 背景歷史

當今社會各種應用系統(tǒng)諸如商業(yè)、社交、搜索、瀏覽等像信息工廠一樣不斷的生產(chǎn)出各種信息,在大數(shù)據(jù)時代,我們面臨如下幾個挑戰(zhàn):

  • 如何收集這些巨大的信息
  • 如何分析它
  • 如何及時做到如上兩點
  • 以上幾個挑戰(zhàn)形成了一個業(yè)務需求模型,即生產(chǎn)者生產(chǎn)(produce)各種信息,消費者消費(consume)(處理分析)這些信息,而在生產(chǎn)者與消費者之間,需要一個溝通兩者的橋梁-消息系統(tǒng)。從一個微觀層面來說,這種需求也可理解為不同的系統(tǒng)之間如何傳遞消息。

    1.2 Kafka誕生

    Kafka由?linked-in?開源?
    kafka-即是解決上述這類問題的一個框架,它實現(xiàn)了生產(chǎn)者和消費者之間的無縫連接。?
    kafka-高產(chǎn)出的分布式消息系統(tǒng)(A high-throughput distributed messaging system)

    1.3 Kafka現(xiàn)在

    Apache kafka 是一個分布式的基于push-subscribe的消息系統(tǒng),它具備快速、可擴展、可持久化的特點。它現(xiàn)在是Apache旗下的一個開源系統(tǒng),作為hadoop生態(tài)系統(tǒng)的一部分,被各種商業(yè)公司廣泛應用。它的最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景:比如基于hadoop的批處理系統(tǒng)、低延遲的實時系統(tǒng)、storm/spark流式處理引擎。

    二、Kafka技術(shù)概覽

    2.1 Kafka的特性

    • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
    • 可擴展性:kafka集群支持熱擴展
    • 持久性、可靠性:消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止數(shù)據(jù)丟失
    • 容錯性:允許集群中節(jié)點失敗(若副本數(shù)量為n,則允許n-1個節(jié)點失敗)
    • 高并發(fā):支持數(shù)千個客戶端同時讀寫

    2.2 Kafka一些重要設計思想

    下面介紹先大體介紹一下Kafka的主要設計思想,可以讓相關(guān)人員在短時間內(nèi)了解到kafka相關(guān)特性,如果想深入研究,后面會對其中每一個特性都做詳細介紹。

    • Consumergroup:各個consumer可以組成一個組,每個消息只能被組中的一個consumer消費,如果一個消息可以被多個consumer消費的話,那么這些consumer必須在不同的組。
    • 消息狀態(tài):在Kafka中,消息的狀態(tài)被保存在consumer中,broker不會關(guān)心哪個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味著如果consumer處理不好的話,broker上的一個消息可能會被消費多次。
    • 消息持久化:Kafka中會把消息持久化到本地文件系統(tǒng)中,并且保持極高的效率。
    • 消息有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當然其中很多細節(jié)是可配置的。
    • 批量發(fā)送:Kafka支持以消息集合為單位進行批量發(fā)送,以提高push效率。
    • push-and-pull?: Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對消息的生產(chǎn)和消費是異步的。
    • Kafka集群中broker之間的關(guān)系:不是主從關(guān)系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節(jié)點。
    • 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對于0.7.x主要靠zookeeper來實現(xiàn)負載均衡)。
    • 同步異步:Producer采用異步push方式,極大提高Kafka系統(tǒng)的吞吐率(可以通過參數(shù)控制是采用同步還是異步方式)。
    • 分區(qū)機制partition:Kafka的broker端支持消息分區(qū),Producer可以決定把消息發(fā)到哪個分區(qū),在一個分區(qū)中消息的順序就是Producer發(fā)送消息的順序,一個主題中可以有多個分區(qū),具體分區(qū)的數(shù)量是可配置的。分區(qū)的意義很重大,后面的內(nèi)容會逐漸體現(xiàn)。
    • 離線數(shù)據(jù)裝載:Kafka由于對可拓展的數(shù)據(jù)持久化的支持,它也非常適合向Hadoop或者數(shù)據(jù)倉庫中進行數(shù)據(jù)裝載。
    • 插件支持:現(xiàn)在不少活躍的社區(qū)已經(jīng)開發(fā)出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關(guān)的插件。

    2.3 kafka 應用場景

    • 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統(tǒng)一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
    • 消息系統(tǒng):解耦和生產(chǎn)者和消費者、緩存消息等。
    • 用戶活動跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網(wǎng)頁、搜索、點擊等活動,這些活動信息被各個服務器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。
    • 運營指標:Kafka也經(jīng)常用來記錄運營監(jiān)控數(shù)據(jù)。包括收集各種分布式應用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報警和報告。
    • 流式處理:比如spark streaming和storm
    • 事件源

    2.4 Kafka架構(gòu)組件

    Kafka中發(fā)布訂閱的對象是topic。我們可以為每類數(shù)據(jù)創(chuàng)建一個topic,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫數(shù)據(jù)。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。

    • topic:消息存放的目錄即主題
    • Producer:生產(chǎn)消息到topic的一方
    • Consumer:訂閱topic消費消息的一方
    • Broker:Kafka的服務實例就是一個broker
    • ?

    ?

    2.5 Kafka Topic&Partition

    Topic & Partition & Log

    Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。若創(chuàng)建topic1和topic2兩個topic,且分別有13個和19個分區(qū),則整個集群上會相應會生成共32個文件夾(本文所用集群共8個節(jié)點,此處topic1和topic2 replication-factor均為1),如下圖所示。

    每個日志文件都是一個log entrie序列,每個log entrie包含一個4字節(jié)整型數(shù)值(值為N+5),1個字節(jié)的"magic value",4個字節(jié)的CRC校驗碼,其后跟N個字節(jié)的消息體。每條消息都有一個當前Partition下唯一的64字節(jié)的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:

    message length : 4 bytes (value: 1+4+n) "magic" value : 1 byte crc : 4 bytes payload : n bytes

    這個log entries并非由一個文件構(gòu)成,而是分成多個segment,每個segment以該segment第一條消息的offset命名并以“.kafka”為后綴。另外會有一個索引文件,它標明了每個segment下包含的log entry的offset范圍,如下圖所示。

    因為每條消息都被append到該Partition中,屬于順序?qū)懘疟P,因此效率非常高(經(jīng)驗證,順序?qū)懘疟P效率比隨機寫內(nèi)存還要高,這是Kafka高吞吐率的一個很重要的保證)。

    消息發(fā)送時都被發(fā)送到一個topic,其本質(zhì)就是一個目錄,而topic由是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:

    我們可以看到,每個Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。?
    Kafka集群會保存所有的消息,不管消息有沒有被消費;我們可以設定消息的過期時間,只有過期的數(shù)據(jù)才會被自動清除以釋放磁盤空間。比如我們設置消息過期時間為2天,那么這2天內(nèi)的所有消息都會被保存到集群中,數(shù)據(jù)只有超過了兩天才會被清除。?
    Kafka需要維持的元數(shù)據(jù)只有一個–消費消息在Partition中的offset值,Consumer每消費一個消息,offset就會加1。其實消息的狀態(tài)完全是由Consumer控制的,Consumer可以跟蹤和重設這個offset值,這樣的話Consumer就可以讀取任意位置的消息。?
    把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴展,每個Partition可以通過調(diào)整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數(shù)據(jù)了;第二就是可以提高并發(fā),因為可以以Partition為單位讀寫了。

    每個分區(qū)是一個有序的,不可變的記錄序列,不斷附加到一個結(jié)構(gòu)化的提交日志中。每個分區(qū)中的記錄都被分配一個順序的id號,稱為唯一標識分區(qū)中每個記錄的偏移量。

    Kafka集群保留所有已發(fā)布的記錄 - 無論它們是否已被使用 - 使用可配置的保留期限。例如,如果保留策略設置為兩天,則在發(fā)布記錄后的兩天內(nèi),該策略可用于消費,之后將其丟棄以釋放空間??ǚ蚩ǖ男阅茉跀?shù)據(jù)大小方面是有效的,因此長時間存儲數(shù)據(jù)并不成問題。

    實際上,在每個消費者基礎(chǔ)上保留的唯一元數(shù)據(jù)是消費者在日志中的偏移或位置。這個偏移量由消費者控制:通常消費者會在讀取記錄時線性地提高其偏移,但實際上,由于位置由消費者控制,它可以以任何順序消耗記錄。例如,消費者可以重置為較舊的偏移量以重新處理來自過去的數(shù)據(jù)或跳過最近的記錄,并從“現(xiàn)在”開始消費。

    這種特征的組合意味著卡夫卡消費者非常便宜 - 他們可以來回去對集群或其他消費者沒有太大的影響。例如,您可以使用我們的命令行工具來“拖尾”任何主題的內(nèi)容,而無需更改現(xiàn)有消費者所消耗的內(nèi)容。

    日志中的分區(qū)有幾個目的。首先,它們允許日志擴展到適合單個服務器的大小。每個單獨的分區(qū)必須適合托管它的服務器,但主題可能有很多分區(qū),因此它可以處理任意數(shù)量的數(shù)據(jù)。第二,它們作為并行性的單位 - 更重要的是這一點。

    三、Kafka 核心組件

    3.1 Replications、Partitions 和Leaders

    通過上面介紹的我們可以知道,kafka中的數(shù)據(jù)是持久化的并且能夠容錯的。Kafka允許用戶為每個topic設置副本數(shù)量,副本數(shù)量決定了有幾個broker來存放寫入的數(shù)據(jù)。如果你的副本數(shù)量設置為3,那么一份數(shù)據(jù)就會被存放在3臺不同的機器上,那么就允許有2個機器失敗。一般推薦副本數(shù)量至少為2,這樣就可以保證增減、重啟機器時不會影響到數(shù)據(jù)消費。如果對數(shù)據(jù)持久化有更高的要求,可以把副本數(shù)量設置為3或者更多。?
    Kafka中的topic是以partition的形式存放的,每一個topic都可以設置它的partition數(shù)量,Partition的數(shù)量決定了組成topic的log的數(shù)量。Producer在生產(chǎn)數(shù)據(jù)時,會按照一定規(guī)則(這個規(guī)則是可以自定義的)把消息發(fā)布到topic的各個partition中。上面將的副本都是以partition為單位的,不過只有一個partition的副本會被選舉成leader作為讀寫用。?
    關(guān)于如何設置partition值需要考慮的因素。一個partition只能被一個消費者消費(一個消費者可以同時消費多個partition),因此,如果設置的partition的數(shù)量小于consumer的數(shù)量,就會有消費者消費不到數(shù)據(jù)。所以,推薦partition的數(shù)量一定要大于同時運行的consumer的數(shù)量。另外一方面,建議partition的數(shù)量大于集群broker的數(shù)量,這樣leader partition就可以均勻的分布在各個broker中,最終使得集群負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition分配一些內(nèi)存來緩存消息數(shù)據(jù),如果partition數(shù)量越大,就要為kafka分配更大的heap space。

    3.2 Producers

    Producers直接發(fā)送消息到broker上的leader partition,不需要經(jīng)過任何中介一系列的路由轉(zhuǎn)發(fā)。為了實現(xiàn)這個特性,kafka集群中的每個broker都可以響應producer的請求,并返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現(xiàn)階段哪些leader partition是可以直接被訪問的。?
    Producer客戶端自己控制著消息被推送到哪些partition。實現(xiàn)的方式可以是隨機分配、實現(xiàn)一類隨機負載均衡算法,或者指定一些分區(qū)算法。Kafka提供了接口供用戶實現(xiàn)自定義的分區(qū),用戶可以為每個消息指定一個partitionKey,通過這個key來實現(xiàn)一些hash分區(qū)算法。比如,把userid作為partitionkey的話,相同userid的消息將會被推送到同一個分區(qū)。?
    以Batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafka Producer 可以將消息在內(nèi)存中累計到一定數(shù)量后作為一個batch發(fā)送請求。Batch的數(shù)量大小可以通過Producer的參數(shù)控制,參數(shù)值可以設置為累計的消息的數(shù)量(如500條)、累計的時間間隔(如100ms)或者累計的數(shù)據(jù)大小(64KB)。通過增加batch的大小,可以減少網(wǎng)絡請求和磁盤IO的次數(shù),當然具體參數(shù)設置需要在效率和時效性方面做一個權(quán)衡。?
    Producers可以異步的并行的向kafka發(fā)送消息,但是通常producer在發(fā)送完消息之后會得到一個future響應,返回的是offset值或者發(fā)送過程中遇到的錯誤。這其中有個非常重要的參數(shù)“acks”,這個參數(shù)決定了producer要求leader partition 收到確認的副本個數(shù),如果acks設置數(shù)量為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發(fā)送成功,這樣有可能會導致數(shù)據(jù)丟失,但同時,acks值為0會得到最大的系統(tǒng)吞吐量。?
    若acks設置為1,表示producer會在leader partition收到消息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到消息。若設置為-1,producer會在所有備份的partition收到消息時得到broker的確認,這個設置可以得到最高的可靠性保證。?
    Kafka 消息有一個定長的header和變長的字節(jié)數(shù)組組成。因為kafka消息支持字節(jié)數(shù)組,也就使得kafka可以支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前。

    ?

    3.3 Consumers

    Kafka提供了兩套consumer api,分為high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連接,并且這個API是完全無狀態(tài)的,每次請求都需要指定offset值,因此,這套API也是最靈活的。?
    在kafka中,當前讀到消息的offset值是由consumer來維護的,因此,consumer可以自己決定如何讀取kafka中的數(shù)據(jù)。比如,consumer可以通過重設offset值來重新消費已消費過的數(shù)據(jù)。不管有沒有被消費,kafka會保存數(shù)據(jù)一段時間,這個時間周期是可配置的,只有到了過期時間,kafka才會刪除這些數(shù)據(jù)。?
    High-level API封裝了對集群中一系列broker的訪問,可以透明的消費一個topic。它自己維持了已消費消息的狀態(tài),即每次消費的都是下一個消息。?
    High-level API還支持以組的形式消費topic,如果consumers有同一個組名,那么kafka就相當于一個隊列消息服務,而各個consumer均衡的消費相應partition中的數(shù)據(jù)。若consumers有不同的組名,那么此時kafka就相當與一個廣播服務,會把topic中的所有消息廣播到每個consumer。?

    Kafka的設計理念之一就是同時提供離線處理和實時處理。根據(jù)這一特性,可以使用Storm這種實時流處理系統(tǒng)對消息進行實時在線處理,同時使用Hadoop這種批處理系統(tǒng)進行離線處理,還可以同時將數(shù)據(jù)實時備份到另一個數(shù)據(jù)中心,只需要保證這三個操作所使用的Consumer屬于不同的Consumer Group即可。下圖是Kafka在Linkedin的一種簡化部署示意圖。

    ?

    一張圖來展示是如何查找Message的:

    ?

    ?

    四、Kafka核心特性

    4.1 壓縮

    我們上面已經(jīng)知道了Kafka支持以集合(batch)為單位發(fā)送消息,在此基礎(chǔ)上,Kafka還支持對消息集合進行壓縮,Producer端可以通過GZIP或Snappy格式對消息集合進行壓縮。Producer端進行壓縮之后,在Consumer端需進行解壓。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量,減輕對網(wǎng)絡傳輸?shù)膲毫?#xff0c;在對大數(shù)據(jù)處理上,瓶頸往往體現(xiàn)在網(wǎng)絡上而不是CPU(壓縮和解壓會耗掉部分CPU資源)。?
    那么如何區(qū)分消息是壓縮的還是未壓縮的呢,Kafka在消息頭部添加了一個描述壓縮屬性字節(jié),這個字節(jié)的后兩位表示消息的壓縮采用的編碼,如果后兩位為0,則表示消息未被壓縮。

    4.2消息可靠性

    在消息系統(tǒng)中,保證消息在生產(chǎn)和消費過程中的可靠性是十分重要的,在實際消息傳遞過程中,可能會出現(xiàn)如下三中情況:

    • 一個消息發(fā)送失敗
    • 一個消息被發(fā)送多次
    • 最理想的情況:exactly-once ,一個消息發(fā)送成功且僅發(fā)送了一次

    有許多系統(tǒng)聲稱它們實現(xiàn)了exactly-once,但是它們其實忽略了生產(chǎn)者或消費者在生產(chǎn)和消費過程中有可能失敗的情況。比如雖然一個Producer成功發(fā)送一個消息,但是消息在發(fā)送途中丟失,或者成功發(fā)送到broker,也被consumer成功取走,但是這個consumer在處理取過來的消息時失敗了。?
    從Producer端看:Kafka是這么處理的,當一個消息被發(fā)送后,Producer會等待broker成功接收到消息的反饋(可通過參數(shù)控制等待時間),如果消息在途中丟失或是其中一個broker掛掉,Producer會重新發(fā)送(我們知道Kafka有備份機制,可以通過參數(shù)控制是否等待所有備份節(jié)點都收到消息)。?
    從Consumer端看:前面講到過partition,broker端記錄了partition中的一個offset值,這個值指向Consumer下一個即將消費message。當Consumer收到了消息,但卻在處理過程中掛掉,此時Consumer可以通過這個offset值重新找到上一個消息再進行處理。Consumer還有權(quán)限控制這個offset值,對持久化到broker端的消息做任意處理。

    4.3 備份機制

    備份機制是Kafka0.8版本的新特性,備份機制的出現(xiàn)大大提高了Kafka集群的可靠性、穩(wěn)定性。有了備份機制后,Kafka允許集群中的節(jié)點掛掉后而不影響整個集群工作。一個備份數(shù)量為n的集群允許n-1個節(jié)點失敗。在所有備份節(jié)點中,有一個節(jié)點作為lead節(jié)點,這個節(jié)點保存了其它備份節(jié)點列表,并維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:

    4.4 Kafka高效性相關(guān)設計

    4.4.1 消息的持久化

    Kafka高度依賴文件系統(tǒng)來存儲和緩存消息,一般的人認為磁盤是緩慢的,這導致人們對持久化結(jié)構(gòu)具有競爭性持懷疑態(tài)度。其實,磁盤遠比你想象的要快或者慢,這決定于我們?nèi)绾问褂么疟P。?
    一個和磁盤性能有關(guān)的關(guān)鍵事實是:磁盤驅(qū)動器的吞吐量跟尋到延遲是相背離的,也就是所,線性寫的速度遠遠大于隨機寫。比如:在一個6 7200rpm SATA RAID-5 的磁盤陣列上線性寫的速度大概是600M/秒,但是隨機寫的速度只有100K/秒,兩者相差將近6000倍。線性讀寫在大多數(shù)應用場景下是可以預測的,因此,操作系統(tǒng)利用read-ahead和write-behind技術(shù)來從大的數(shù)據(jù)塊中預取數(shù)據(jù),或者將多個邏輯上的寫操作組合成一個大寫物理寫操作中。更多的討論可以在ACMQueueArtical中找到,他們發(fā)現(xiàn),對磁盤的線性讀在有些情況下可以比內(nèi)存的隨機訪問要快一些。?
    為了補償這個性能上的分歧,現(xiàn)代操作系統(tǒng)都會把空閑的內(nèi)存用作磁盤緩存,盡管在內(nèi)存回收的時候會有一點性能上的代價。所有的磁盤讀寫操作會在這個統(tǒng)一的緩存上進行。?
    此外,如果我們是在JVM的基礎(chǔ)上構(gòu)建的,熟悉java內(nèi)存應用管理的人應該清楚以下兩件事情:

  • 一個對象的內(nèi)存消耗是非常高的,經(jīng)常是所存數(shù)據(jù)的兩倍或者更多。
  • 隨著堆內(nèi)數(shù)據(jù)的增多,Java的垃圾回收會變得非常昂貴。
  • 基于這些事實,利用文件系統(tǒng)并且依靠頁緩存比維護一個內(nèi)存緩存或者其他結(jié)構(gòu)要好——我們至少要使得可用的緩存加倍,通過自動訪問可用內(nèi)存,并且通過存儲更緊湊的字節(jié)結(jié)構(gòu)而不是一個對象,這將有可能再次加倍。這么做的結(jié)果就是在一臺32GB的機器上,如果不考慮GC懲罰,將最多有28-30GB的緩存。此外,這些緩存將會一直存在即使服務重啟,然而進程內(nèi)緩存需要在內(nèi)存中重構(gòu)(10GB緩存需要花費10分鐘)或者它需要一個完全冷緩存啟動(非常差的初始化性能)。它同時也簡化了代碼,因為現(xiàn)在所有的維護緩存和文件系統(tǒng)之間內(nèi)聚的邏輯都在操作系統(tǒng)內(nèi)部了,這使得這樣做比one-off in-process attempts更加高效與準確。如果你的磁盤應用更加傾向于順序讀取,那么read-ahead在每次磁盤讀取中實際上獲取到這人緩存中的有用數(shù)據(jù)。?
    以上這些建議了一個簡單的設計:不同于維護盡可能多的內(nèi)存緩存并且在需要的時候刷新到文件系統(tǒng)中,我們換一種思路。所有的數(shù)據(jù)不需要調(diào)用刷新程序,而是立刻將它寫到一個持久化的日志中。事實上,這僅僅意味著,數(shù)據(jù)將被傳輸?shù)絻?nèi)核頁緩存中并稍后被刷新。我們可以增加一個配置項以讓系統(tǒng)的用戶來控制數(shù)據(jù)在什么時候被刷新到物理硬盤上。

    4.4.2 常數(shù)時間性能保證

    消息系統(tǒng)中持久化數(shù)據(jù)結(jié)構(gòu)的設計通常是維護者一個和消費隊列有關(guān)的B樹或者其它能夠隨機存取結(jié)構(gòu)的元數(shù)據(jù)信息。B樹是一個很好的結(jié)構(gòu),可以用在事務型與非事務型的語義中。但是它需要一個很高的花費,盡管B樹的操作需要O(logN)。通常情況下,這被認為與常數(shù)時間等價,但這對磁盤操作來說是不對的。磁盤尋道一次需要10ms,并且一次只能尋一個,因此并行化是受限的。?
    直覺上來講,一個持久化的隊列可以構(gòu)建在對一個文件的讀和追加上,就像一般情況下的日志解決方案。盡管和B樹相比,這種結(jié)構(gòu)不能支持豐富的語義,但是它有一個優(yōu)點,所有的操作都是常數(shù)時間,并且讀寫之間不會相互阻塞。這種設計具有極大的性能優(yōu)勢:最終系統(tǒng)性能和數(shù)據(jù)大小完全無關(guān),服務器可以充分利用廉價的硬盤來提供高效的消息服務。?
    事實上還有一點,磁盤空間的無限增大而不影響性能這點,意味著我們可以提供一般消息系統(tǒng)無法提供的特性。比如說,消息被消費后不是立馬被刪除,我們可以將這些消息保留一段相對比較長的時間(比如一個星期)。

    4.4.3 進一步提高效率

    我們已經(jīng)為效率做了非常多的努力。但是有一種非常主要的應用場景是:處理Web活動數(shù)據(jù),它的特點是數(shù)據(jù)量非常大,每一次的網(wǎng)頁瀏覽都會產(chǎn)生大量的寫操作。更進一步,我們假設每一個被發(fā)布的消息都會被至少一個consumer消費,因此我們更要怒路讓消費變得更廉價。?
    通過上面的介紹,我們已經(jīng)解決了磁盤方面的效率問題,除此之外,在此類系統(tǒng)中還有兩類比較低效的場景:

    • 太多小的I/O操作
    • 過多的字節(jié)拷貝

    為了減少大量小I/O操作的問題,kafka的協(xié)議是圍繞消息集合構(gòu)建的。Producer一次網(wǎng)絡請求可以發(fā)送一個消息集合,而不是每一次只發(fā)一條消息。在server端是以消息塊的形式追加消息到log中的,consumer在查詢的時候也是一次查詢大量的線性數(shù)據(jù)塊。消息集合即MessageSet,實現(xiàn)本身是一個非常簡單的API,它將一個字節(jié)數(shù)組或者文件進行打包。所以對消息的處理,這里沒有分開的序列化和反序列化的上步驟,消息的字段可以按需反序列化(如果沒有需要,可以不用反序列化)。?
    另一個影響效率的問題就是字節(jié)拷貝。為了解決字節(jié)拷貝的問題,kafka設計了一種“標準字節(jié)消息”,Producer、Broker、Consumer共享這一種消息格式。Kakfa的message log在broker端就是一些目錄文件,這些日志文件都是MessageSet按照這種“標準字節(jié)消息”格式寫入到磁盤的。?
    維持這種通用的格式對這些操作的優(yōu)化尤為重要:持久化log 塊的網(wǎng)絡傳輸。流行的unix操作系統(tǒng)提供了一種非常高效的途徑來實現(xiàn)頁面緩存和socket之間的數(shù)據(jù)傳遞。在Linux操作系統(tǒng)中,這種方式被稱作:sendfile system call(Java提供了訪問這個系統(tǒng)調(diào)用的方法:FileChannel.transferTo api)。

    為了理解sendfile的影響,需要理解一般的將數(shù)據(jù)從文件傳到socket的路徑:

  • 操作系統(tǒng)將數(shù)據(jù)從磁盤讀到內(nèi)核空間的頁緩存中
  • 應用將數(shù)據(jù)從內(nèi)核空間讀到用戶空間的緩存中
  • 應用將數(shù)據(jù)寫回內(nèi)核空間的socket緩存中
  • 操作系統(tǒng)將數(shù)據(jù)從socket緩存寫到網(wǎng)卡緩存中,以便將數(shù)據(jù)經(jīng)網(wǎng)絡發(fā)出
  • 這種操作方式明顯是非常低效的,這里有四次拷貝,兩次系統(tǒng)調(diào)用。如果使用sendfile,就可以避免兩次拷貝:操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡上。所以在這個優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的。?
    我們期望一個主題上有多個消費者是一種常見的應用場景。利用上述的zero-copy,數(shù)據(jù)只被拷貝到頁緩存一次,然后就可以在每次消費時被重得利用,而不需要將數(shù)據(jù)存在內(nèi)存中,然后在每次讀的時候拷貝到內(nèi)核空間中。這使得消息消費速度可以達到網(wǎng)絡連接的速度。這樣以來,通過頁面緩存和sendfile的結(jié)合使用,整個kafka集群幾乎都已以緩存的方式提供服務,而且即使下游的consumer很多,也不會對整個集群服務造成壓力。?
    關(guān)于sendfile和zero-copy,請參考:zero-copy

    五、Kafka集群部署

    5.1 集群部署

    為了提高性能,推薦采用專用的服務器來部署kafka集群,盡量與hadoop集群分開,因為kafka依賴磁盤讀寫和大的頁面緩存,如果和hadoop共享節(jié)點的話會影響其使用頁面緩存的性能。?
    Kafka集群的大小需要根據(jù)硬件的配置、生產(chǎn)者消費者的并發(fā)數(shù)量、數(shù)據(jù)的副本個數(shù)、數(shù)據(jù)的保存時長綜合確定。?
    磁盤的吞吐量尤為重要,因為通常kafka的瓶頸就在磁盤上。?
    Kafka依賴于zookeeper,建議采用專用服務器來部署zookeeper集群,zookeeper集群的節(jié)點采用偶數(shù)個,一般建議用3、5、7個。注意zookeeper集群越大其讀寫性能越慢,因為zookeeper需要在節(jié)點之間同步數(shù)據(jù)。一個3節(jié)點的zookeeper集群允許一個節(jié)點失敗,一個5節(jié)點集群允許2個幾點失敗。

    5.2 集群大小

    有很多因素決定著kafka集群需要具備存儲能力的大小,最準確的衡量辦法就是模擬負載來測算一下,Kafka本身也提供了負載測試的工具。?
    如果不想通過模擬實驗來評估集群大小,最好的辦法就是根據(jù)硬盤的空間需求來推算。下面我就根據(jù)網(wǎng)絡和磁盤吞吐量需求來做一下估算。?
    我們做如下假設:

    • W:每秒寫多少MB
    • R :副本數(shù)
    • C :Consumer的數(shù)量

    一般的來說,kafka集群瓶頸在于網(wǎng)絡和磁盤吞吐量,所以我們先評估一下集群的網(wǎng)絡和磁盤需求。?
    對于每條消息,每個副本都要寫一遍,所以整體寫的速度是W*R。讀數(shù)據(jù)的部分主要是集群內(nèi)部各個副本從leader同步消息讀和集群外部的consumer讀,所以集群內(nèi)部讀的速率是(R-1)*W,同時,外部consumer讀的速度是C*W,因此:

    • Write:W*R
    • Read:(R-1)*W+C*W

    需要注意的是,我們可以在讀的時候緩存部分數(shù)據(jù)來減少IO操作,如果一個集群有M MB內(nèi)存,寫的速度是W MB/sec,則允許M/(W*R) 秒的寫可以被緩存。如果集群有32GB內(nèi)存,寫的速度是50MB/s的話,則可以至少緩存10分鐘的數(shù)據(jù)。

    5.3 Kafka性能測試

    Performance testing

    5.4 Kafka在zookeeper中的數(shù)據(jù)結(jié)構(gòu)

    Kafka data structures in Zookeeper

    六、Kafka主要配置

    6.1 Broker Config

    屬性默認值描述
    broker.id?必填參數(shù),broker的唯一標識
    log.dirs/tmp/kafka-logsKafka數(shù)據(jù)存放的目錄??梢灾付ǘ鄠€目錄,中間用逗號分隔,當新partition被創(chuàng)建的時會被存放到當前存放partition最少的目錄。
    port9092BrokerServer接受客戶端連接的端口號
    zookeeper.connectnullZookeeper的連接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3??梢蕴钜粋€或多個,為了提高可靠性,建議都填上。注意,此配置允許我們指定一個zookeeper路徑來存放此kafka集群的所有數(shù)據(jù),為了與其他應用集群區(qū)分開,建議在此配置中指定本集群存放目錄,格式為:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消費者的參數(shù)要和此參數(shù)一致。
    message.max.bytes1000000服務器可以接收到的最大的消息大小。注意此參數(shù)要和consumer的maximum.message.size大小一致,否則會因為生產(chǎn)者生產(chǎn)的消息太大導致消費者無法消費。
    num.io.threads8服務器用來執(zhí)行讀寫請求的IO線程數(shù),此參數(shù)的數(shù)量至少要等于服務器上磁盤的數(shù)量。
    queued.max.requests500I/O線程可以處理請求的隊列大小,若實際請求數(shù)超過此大小,網(wǎng)絡線程將停止接收新的請求。
    socket.send.buffer.bytes100 * 1024The SO_SNDBUFF buffer the server prefers for socket connections.
    socket.receive.buffer.bytes100 * 1024The SO_RCVBUFF buffer the server prefers for socket connections.
    socket.request.max.bytes100 * 1024 * 1024服務器允許請求的最大值, 用來防止內(nèi)存溢出,其值應該小于 Java heap size.
    num.partitions1默認partition數(shù)量,如果topic在創(chuàng)建時沒有指定partition數(shù)量,默認使用此值,建議改為5
    log.segment.bytes1024 * 1024 * 1024Segment文件的大小,超過此值將會自動新建一個segment,此值可以被topic級別的參數(shù)覆蓋。
    log.roll.{ms,hours}24 * 7 hours新建segment文件的時間,此值可以被topic級別的參數(shù)覆蓋。
    log.retention.{ms,minutes,hours}7 daysKafka segment log的保存周期,保存周期超過此時間日志就會被刪除。此參數(shù)可以被topic級別參數(shù)覆蓋。數(shù)據(jù)量大時,建議減小此值。
    log.retention.bytes-1每個partition的最大容量,若數(shù)據(jù)量超過此值,partition數(shù)據(jù)將會被刪除。注意這個參數(shù)控制的是每個partition而不是topic。此參數(shù)可以被log級別參數(shù)覆蓋。
    log.retention.check.interval.ms5 minutes刪除策略的檢查周期
    auto.create.topics.enabletrue自動創(chuàng)建topic參數(shù),建議此值設置為false,嚴格控制topic管理,防止生產(chǎn)者錯寫topic。
    default.replication.factor1默認副本數(shù)量,建議改為2。
    replica.lag.time.max.ms10000在此窗口時間內(nèi)沒有收到follower的fetch請求,leader會將其從ISR(in-sync replicas)中移除。
    replica.lag.max.messages4000如果replica節(jié)點落后leader節(jié)點此值大小的消息數(shù)量,leader節(jié)點就會將其從ISR中移除。
    replica.socket.timeout.ms30 * 1000replica向leader發(fā)送請求的超時時間。
    replica.socket.receive.buffer.bytes64 * 1024The socket receive buffer for network requests to the leader for replicating data.
    replica.fetch.max.bytes1024 * 1024The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.
    replica.fetch.wait.max.ms500The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.
    num.replica.fetchers1Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.
    fetch.purgatory.purge.interval.requests1000The purge interval (in number of requests) of the fetch request purgatory.
    zookeeper.session.timeout.ms6000ZooKeeper session 超時時間。如果在此時間內(nèi)server沒有向zookeeper發(fā)送心跳,zookeeper就會認為此節(jié)點已掛掉。 此值太低導致節(jié)點容易被標記死亡;若太高,.會導致太遲發(fā)現(xiàn)節(jié)點死亡。
    zookeeper.connection.timeout.ms6000客戶端連接zookeeper的超時時間。
    zookeeper.sync.time.ms2000H ZK follower落后 ZK leader的時間。
    controlled.shutdown.enabletrue允許broker shutdown。如果啟用,broker在關(guān)閉自己之前會把它上面的所有l(wèi)eaders轉(zhuǎn)移到其它brokers上,建議啟用,增加集群穩(wěn)定性。
    auto.leader.rebalance.enabletrueIf this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available.
    leader.imbalance.per.broker.percentage10The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.
    leader.imbalance.check.interval.seconds300The frequency with which to check for leader imbalance.
    offset.metadata.max.bytes4096The maximum amount of metadata to allow clients to save with their offsets.
    connections.max.idle.ms600000Idle connections timeout: the server socket processor threads close the connections that idle more than this.
    num.recovery.threads.per.data.dir1The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    unclean.leader.election.enabletrueIndicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
    delete.topic.enablefalse啟用deletetopic參數(shù),建議設置為true。
    offsets.topic.num.partitions50The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).
    offsets.topic.retention.minutes1440Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.
    offsets.retention.check.interval.ms600000The frequency at which the offset manager checks for stale offsets.
    offsets.topic.replication.factor3The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.
    offsets.topic.segment.bytes104857600Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.
    offsets.load.buffer.size5242880An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.
    offsets.commit.required.acks-1The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.
    offsets.commit.timeout.ms5000The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.

    6.2 Producer Config

    屬性默認值描述
    metadata.broker.list?啟動時producer查詢brokers的列表,可以是集群中所有brokers的一個子集。注意,這個參數(shù)只是用來獲取topic的元信息用,producer會從元信息中挑選合適的broker并與之建立socket連接。格式是:host1:port1,host2:port2。
    request.required.acks0參見3.2節(jié)介紹
    request.timeout.ms10000Broker等待ack的超時時間,若等待時間超過此值,會返回客戶端錯誤信息。
    producer.typesync同步異步模式。async表示異步,sync表示同步。如果設置成異步模式,可以允許生產(chǎn)者以batch的形式push數(shù)據(jù),這樣會極大的提高broker性能,推薦設置為異步。
    serializer.classkafka.serializer.DefaultEncoder序列號類,.默認序列化成 byte[] 。
    key.serializer.class?Key的序列化類,默認同上。
    partitioner.classkafka.producer.DefaultPartitionerPartition類,默認對key進行hash。
    compression.codecnone指定producer消息的壓縮格式,可選參數(shù)為: “none”, “gzip” and “snappy”。關(guān)于壓縮參見4.1節(jié)
    compressed.topicsnull啟用壓縮的topic名稱。若上面參數(shù)選擇了一個壓縮格式,那么壓縮僅對本參數(shù)指定的topic有效,若本參數(shù)為空,則對所有topic有效。
    message.send.max.retries3Producer發(fā)送失敗時重試次數(shù)。若網(wǎng)絡出現(xiàn)問題,可能會導致不斷重試。
    retry.backoff.ms100Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
    topic.metadata.refresh.interval.ms600 * 1000The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed
    queue.buffering.max.ms5000啟用異步模式時,producer緩存消息的時間。比如我們設置成1000時,它會緩存1秒的數(shù)據(jù)再一次發(fā)送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。
    queue.buffering.max.messages10000采用異步模式時producer buffer 隊列里最大緩存的消息數(shù)量,如果超過這個數(shù)值,producer就會阻塞或者丟掉消息。
    queue.enqueue.timeout.ms-1當達到上面參數(shù)值時producer阻塞等待的時間。如果值設置為0,buffer隊列滿時producer不會阻塞,消息直接被丟掉。若值設置為-1,producer會被阻塞,不會丟消息。
    batch.num.messages200采用異步模式時,一個batch緩存的消息數(shù)量。達到這個數(shù)量值時producer才會發(fā)送消息。
    send.buffer.bytes100 * 1024Socket write buffer size
    client.id“”The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

    6.3 Consumer Config

    屬性默認值描述
    group.id?Consumer的組ID,相同goup.id的consumer屬于同一個組。
    zookeeper.connect?Consumer的zookeeper連接串,要和broker的配置一致。
    consumer.idnull如果不設置會自動生成。
    socket.timeout.ms30 * 1000網(wǎng)絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定。
    socket.receive.buffer.bytes64 * 1024The socket receive buffer for network requests.
    fetch.message.max.bytes1024 * 1024查詢topic-partition時允許的最大消息大小。consumer會為每個partition緩存此大小的消息到內(nèi)存,因此,這個參數(shù)可以控制consumer的內(nèi)存使用量。這個值應該至少比server允許的最大消息大小大,以免producer發(fā)送的消息大于consumer允許的消息。
    num.consumer.fetchers1The number fetcher threads used to fetch data.
    auto.commit.enabletrue如果此值設置為true,consumer會周期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之后將會使用此值作為新開始消費的值。
    auto.commit.interval.ms60 * 1000Consumer提交offset值到zookeeper的周期。
    queued.max.message.chunks2用來被consumer消費的message chunks 數(shù)量, 每個chunk可以緩存fetch.message.max.bytes大小的數(shù)據(jù)量。
    rebalance.max.retries4When a new consumer joins a consumer group the set of consumers attempt to “rebalance” the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.
    fetch.min.bytes1The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
    fetch.wait.max.ms100The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes.
    rebalance.backoff.ms2000Backoff time between retries during rebalance.
    refresh.leader.backoff.ms200Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
    auto.offset.resetlargestWhat to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer
    consumer.timeout.ms-1若在指定時間內(nèi)沒有消息消費,consumer將會拋出異常。
    exclude.internal.topicstrueWhether messages from internal topics (such as offsets) should be exposed to the consumer.
    zookeeper.session.timeout.ms6000ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
    zookeeper.connection.timeout.ms6000The max time that the client waits while establishing a connection to zookeeper.
    zookeeper.sync.time.ms2000How far a ZK follower can be behind a ZK leader

    6.4 Topic 級別的配置

    topic-config

    ?

    6.5?Push vs. Pull

    作為一個消息系統(tǒng),Kafka遵循了傳統(tǒng)的方式,選擇由Producer向broker push消息并由Consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事實上,push模式和pull模式各有優(yōu)劣。

    push模式很難適應消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的。push模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務以及網(wǎng)絡擁塞。而pull模式則可以根據(jù)Consumer的消費能力以適當?shù)乃俾氏M消息。

    對于Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。

    6.6 Kafka delivery guarantee

    有這么幾種可能的delivery guarantee:

    • At most once 消息可能會丟,但絕不會重復傳輸
    • At least one 消息絕不會丟,但可能會重復傳輸
    • Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。

      當Producer向broker發(fā)送消息時,一旦這條消息被commit,因數(shù)replication的存在,它就不會丟。但是如果Producer發(fā)送數(shù)據(jù)給broker后,遇到網(wǎng)絡問題而造成通信中斷,那Producer就無法判斷該條消息是否已經(jīng)commit。雖然Kafka無法確定網(wǎng)絡故障期間發(fā)生了什么,但是Producer可以生成一種類似于主鍵的東西,發(fā)生故障時冪等性的重試多次,這樣就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還并未實現(xiàn),有希望在Kafka未來的版本中實現(xiàn)。(所以目前默認情況下一條消息從Producer到broker是確保了At least once,可通過設置Producer異步發(fā)送實現(xiàn)At most once)。

      接下來討論的是消息從broker到Consumer的delivery guarantee語義。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息后,可以選擇commit,該操作會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之后的開始位置相同。當然可以將Consumer設置為autocommit,即Consumer一旦讀到數(shù)據(jù)立即自動commit。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實際使用中應用程序并非在Consumer讀取完數(shù)據(jù)就結(jié)束了,而是要進行進一步處理,而數(shù)據(jù)處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。

    • 讀完消息先commit再處理消息。這種模式下,如果Consumer在commit后還沒來得及處理消息就crash了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應于At most once

    • 讀完消息先處理再commit。這種模式下,如果在處理完消息之后commit之前Consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經(jīng)被處理過了。這就對應于At least once。在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認為是Exactly once。(筆者認為這種說法比較牽強,畢竟它不是Kafka本身提供的機制,主鍵本身也并不能完全保證操作的冪等性。而且實際上我們說delivery guarantee 語義是討論被處理多少次,而非處理結(jié)果怎樣,因為處理方式多種多樣,我們不應該把處理過程的特性——如是否冪等性,當成Kafka本身的Feature)

    • 如果一定要做到Exactly once,就需要協(xié)調(diào)offset和實際操作的輸出。精典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統(tǒng)可能不支持兩階段提交。比如,Consumer拿到數(shù)據(jù)后可能把數(shù)據(jù)放到HDFS,如果把最新的offset和數(shù)據(jù)本身一起寫到HDFS,那就可以保證數(shù)據(jù)的輸出和offset的更新要么都完成,要么都不完成,間接實現(xiàn)Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,無法存于HDFS,而low level API的offset是由自己去維護的,可以將之存于HDFS中)

      Kafka默認保證At least once,并且允許通過設置Producer異步提交來實現(xiàn)At most once。而Exactly once要求與外部存儲系統(tǒng)協(xié)作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。

    ?

    ZooKeeper與Kafka

    考慮一下有多個服務器的分布式系統(tǒng),每臺服務器都負責保存數(shù)據(jù),在數(shù)據(jù)上執(zhí)行操作。這樣的潛在例子包括分布式搜索引擎、分布式構(gòu)建系統(tǒng)或者已知的系統(tǒng)如Apache Hadoop。所有這些分布式系統(tǒng)的一個常見問題是,你如何在任一時間點確定哪些服務器活著并且在工作中。最重要的是,當面對這些分布式計算的難題,例如網(wǎng)絡失敗、帶寬限制、可變延遲連接、安全問題以及任何網(wǎng)絡環(huán)境,甚至跨多個數(shù)據(jù)中心時可能發(fā)生的錯誤時,你如何可靠地做這些事。這些正是Apache ZooKeeper所關(guān)注的問題,它是一個快速、高可用、容錯、分布式的協(xié)調(diào)服務。你可以使用ZooKeeper構(gòu)建可靠的、分布式的數(shù)據(jù)結(jié)構(gòu),用于群組成員、領(lǐng)導人選舉、協(xié)同工作流和配置服務,以及廣義的分布式數(shù)據(jù)結(jié)構(gòu)如鎖、隊列、屏障(Barrier)和鎖存器(Latch)。許多知名且成功的項目依賴于ZooKeeper,其中包括HBase、Hadoop 2.0、Solr Cloud、Neo4J、Apache Blur(Incubating)和Accumulo。

    ZooKeeper是一個分布式的、分層級的文件系統(tǒng),能促進客戶端間的松耦合,并提供最終一致的,類似于傳統(tǒng)文件系統(tǒng)中文件和目錄的Znode視圖。它提供了基本的操作,例如創(chuàng)建、刪除和檢查Znode是否存在。它提供了事件驅(qū)動模型,客戶端能觀察特定Znode的變化,例如現(xiàn)有Znode增加了一個新的子節(jié)點。ZooKeeper運行多個ZooKeeper服務器,稱為Ensemble,以獲得高可用性。每個服務器都持有分布式文件系統(tǒng)的內(nèi)存復本,為客戶端的讀取請求提供服務。

    上圖展示了典型的ZooKeeper ensemble,一臺服務器作為Leader,其它作為Follower。當Ensemble啟動時,先選出Leader,然后所有Follower復制Leader的狀態(tài)。所有寫請求都通過Leader路由,變更會廣播給所有Follower。變更廣播被稱為原子廣播。

    Kafka中ZooKeeper的用途:正如ZooKeeper用于分布式系統(tǒng)的協(xié)調(diào)和促進,Kafka使用ZooKeeper也是基于相同的原因。ZooKeeper用于管理、協(xié)調(diào)Kafka代理。每個Kafka代理都通過ZooKeeper協(xié)調(diào)其它Kafka代理。當Kafka系統(tǒng)中新增了代理或者某個代理故障失效時,ZooKeeper服務將通知生產(chǎn)者和消費者。生產(chǎn)者和消費者據(jù)此開始與其它代理協(xié)調(diào)工作。

    Kafka分布式系統(tǒng)的總體架構(gòu)

    結(jié)束語

    綜上所述,Kafka 的設計可以幫助我們解決很多架構(gòu)上的問題。但是想要用好 Kafka 的高性能、低耦合、高可靠性、數(shù)據(jù)不丟失等特性,我們需要非常了解 Kafka,以及我們自身的應用系統(tǒng)使用場景,并不是任何環(huán)境 Kafka 都是最佳選擇, 比如對功能需求更復雜,建議使用Rabbitmq等, 對性能要求高,可以使用mcq或redis list或者程序內(nèi)建的queue。

    謀膽并重

    總結(jié)

    以上是生活随笔為你收集整理的最全Kafka 设计与原理详解【2017.9全新】的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。