Kafka学习之路 (三)Kafka的高可用
一、高可用的由來
1.1 為何需要Replication
在Kafka在0.8以前的版本中,是沒有Replication的,一旦某一個(gè)Broker宕機(jī),則其上所有的Partition數(shù)據(jù)都不可被消費(fèi),這與Kafka數(shù)據(jù)持久性及Delivery Guarantee的設(shè)計(jì)目標(biāo)相悖。同時(shí)Producer都不能再將數(shù)據(jù)存于這些Partition中。
如果Producer使用同步模式則Producer會(huì)在嘗試重新發(fā)送message.send.max.retries(默認(rèn)值為3)次后拋出Exception,用戶可以選擇停止發(fā)送后續(xù)數(shù)據(jù)也可選擇繼續(xù)選擇發(fā)送。而前者會(huì)造成數(shù)據(jù)的阻塞,后者會(huì)造成本應(yīng)發(fā)往該Broker的數(shù)據(jù)的丟失。
如果Producer使用異步模式,則Producer會(huì)嘗試重新發(fā)送message.send.max.retries(默認(rèn)值為3)次后記錄該異常并繼續(xù)發(fā)送后續(xù)數(shù)據(jù),這會(huì)造成數(shù)據(jù)丟失并且用戶只能通過日志發(fā)現(xiàn)該問題。同時(shí),Kafka的Producer并未對(duì)異步模式提供callback接口。
由此可見,在沒有Replication的情況下,一旦某機(jī)器宕機(jī)或者某個(gè)Broker停止工作則會(huì)造成整個(gè)系統(tǒng)的可用性降低。隨著集群規(guī)模的增加,整個(gè)集群中出現(xiàn)該類異常的幾率大大增加,因此對(duì)于生產(chǎn)系統(tǒng)而言Replication機(jī)制的引入非常重要。
1.2 Leader Election
引入Replication之后,同一個(gè)Partition可能會(huì)有多個(gè)Replica,而這時(shí)需要在這些Replication之間選出一個(gè)Leader,Producer和Consumer只與這個(gè)Leader交互,其它Replica作為Follower從Leader中復(fù)制數(shù)據(jù)。
因?yàn)樾枰WC同一個(gè)Partition的多個(gè)Replica之間的數(shù)據(jù)一致性(其中一個(gè)宕機(jī)后其它Replica必須要能繼續(xù)服務(wù)并且即不能造成數(shù)據(jù)重復(fù)也不能造成數(shù)據(jù)丟失)。如果沒有一個(gè)Leader,所有Replica都可同時(shí)讀/寫數(shù)據(jù),那就需要保證多個(gè)Replica之間互相(N×N條通路)同步數(shù)據(jù),數(shù)據(jù)的一致性和有序性非常難保證,大大增加了Replication實(shí)現(xiàn)的復(fù)雜性,同時(shí)也增加了出現(xiàn)異常的幾率。而引入Leader后,只有Leader負(fù)責(zé)數(shù)據(jù)讀寫,Follower只向Leader順序Fetch數(shù)據(jù)(N條通路),系統(tǒng)更加簡(jiǎn)單且高效。
二、Kafka HA設(shè)計(jì)解析
2.1 如何將所有Replica均勻分布到整個(gè)集群
為了更好的做負(fù)載均衡,Kafka盡量將所有的Partition均勻分配到整個(gè)集群上。一個(gè)典型的部署方式是一個(gè)Topic的Partition數(shù)量大于Broker的數(shù)量。同時(shí)為了提高Kafka的容錯(cuò)能力,也需要將同一個(gè)Partition的Replica盡量分散到不同的機(jī)器。實(shí)際上,如果所有的Replica都在同一個(gè)Broker上,那一旦該Broker宕機(jī),該P(yáng)artition的所有Replica都無法工作,也就達(dá)不到HA的效果。同時(shí),如果某個(gè)Broker宕機(jī)了,需要保證它上面的負(fù)載可以被均勻的分配到其它幸存的所有Broker上。
Kafka分配Replica的算法如下:
1.將所有Broker(假設(shè)共n個(gè)Broker)和待分配的Partition排序
2.將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上
3.將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mode n)個(gè)Broker上
2.2 Data Replication(副本策略)
Kafka的高可靠性的保障來源于其健壯的副本(replication)策略。
2.2.1 消息傳遞同步策略
Producer在發(fā)布消息到某個(gè)Partition時(shí),先通過ZooKeeper找到該P(yáng)artition的Leader,然后無論該Topic的Replication Factor為多少,Producer只將該消息發(fā)送到該P(yáng)artition的Leader。Leader會(huì)將該消息寫入其本地Log。每個(gè)Follower都從Leader pull數(shù)據(jù)。這種方式上,Follower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致。Follower在收到該消息并寫入其Log后,向Leader發(fā)送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW并且向Producer發(fā)送ACK。
為了提高性能,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK,而非等到數(shù)據(jù)寫入Log中。因此,對(duì)于已經(jīng)commit的消息,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)。
Consumer讀消息也是從Leader讀取,只有被commit過的消息才會(huì)暴露給Consumer。
Kafka Replication的數(shù)據(jù)流如下圖所示:
2.2.2 ACK前需要保證有多少個(gè)備份
對(duì)于Kafka而言,定義一個(gè)Broker是否“活著”包含兩個(gè)條件:
- 一是它必須維護(hù)與ZooKeeper的session(這個(gè)通過ZooKeeper的Heartbeat機(jī)制來實(shí)現(xiàn))。
- 二是Follower必須能夠及時(shí)將Leader的消息復(fù)制過來,不能“落后太多”。
Leader會(huì)跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個(gè)Follower宕機(jī),或者落后太多,Leader將把它從ISR中移除。這里所描述的“落后太多”指Follower復(fù)制的消息落后于Leader后的條數(shù)超過預(yù)定值(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.max.messages配置,其默認(rèn)值是4000)或者Follower超過一定時(shí)間(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.time.max.ms來配置,其默認(rèn)值是10000)未向Leader發(fā)送fetch請(qǐng)求。
Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。事實(shí)上,完全同步復(fù)制要求所有能工作的Follower都復(fù)制完,這條消息才會(huì)被認(rèn)為commit,這種復(fù)制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個(gè)特性)。而異步復(fù)制方式下,Follower異步的從Leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被Leader寫入log就被認(rèn)為已經(jīng)commit,這種情況下如果Follower都復(fù)制完都落后于Leader,而如果Leader突然宕機(jī),則會(huì)丟失數(shù)據(jù)。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower可以批量的從Leader復(fù)制數(shù)據(jù),這樣極大的提高復(fù)制性能(批量寫磁盤),極大減少了Follower與Leader的差距。
需要說明的是,Kafka只解決fail/recover,不處理“Byzantine”(“拜占庭”)問題。一條消息只有被ISR里的所有Follower都從Leader復(fù)制過去才會(huì)被認(rèn)為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了Leader,還沒來得及被任何Follower復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失(Consumer無法消費(fèi)這些數(shù)據(jù))。而對(duì)于Producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設(shè)置。這種機(jī)制確保了只要ISR有一個(gè)或以上的Follower,一條被commit的消息就不會(huì)丟失。
2.2.3 Leader Election算法
Leader選舉本質(zhì)上是一個(gè)分布式鎖,有兩種方式實(shí)現(xiàn)基于ZooKeeper的分布式鎖:
- 節(jié)點(diǎn)名稱唯一性:多個(gè)客戶端創(chuàng)建一個(gè)節(jié)點(diǎn),只有成功創(chuàng)建節(jié)點(diǎn)的客戶端才能獲得鎖
- 臨時(shí)順序節(jié)點(diǎn):所有客戶端在某個(gè)目錄下創(chuàng)建自己的臨時(shí)順序節(jié)點(diǎn),只有序號(hào)最小的才獲得鎖
一種非常常用的選舉leader的方式是“Majority Vote”(“少數(shù)服從多數(shù)”),但Kafka并未采用這種方式。這種模式下,如果我們有2f+1個(gè)Replica(包含Leader和Follower),那在commit之前必須保證有f+1個(gè)Replica復(fù)制完消息,為了保證正確選出新的Leader,fail的Replica不能超過f個(gè)。因?yàn)樵谑O碌娜我鈌+1個(gè)Replica里,至少有一個(gè)Replica包含有最新的所有消息。這種方式有個(gè)很大的優(yōu)勢(shì),系統(tǒng)的latency只取決于最快的幾個(gè)Broker,而非最慢那個(gè)。Majority Vote也有一些劣勢(shì),為了保證Leader Election的正常進(jìn)行,它所能容忍的fail的follower個(gè)數(shù)比較少。如果要容忍1個(gè)follower掛掉,必須要有3個(gè)以上的Replica,如果要容忍2個(gè)Follower掛掉,必須要有5個(gè)以上的Replica。也就是說,在生產(chǎn)環(huán)境下為了保證較高的容錯(cuò)程度,必須要有大量的Replica,而大量的Replica又會(huì)在大數(shù)據(jù)量下導(dǎo)致性能的急劇下降。這就是這種算法更多用在ZooKeeper這種共享集群配置的系統(tǒng)中而很少在需要存儲(chǔ)大量數(shù)據(jù)的系統(tǒng)中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的數(shù)據(jù)存儲(chǔ)并沒有使用這種方式。
Kafka在ZooKeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR(in-sync replicas),這個(gè)ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為Leader的可能。在這種模式下,對(duì)于f+1個(gè)Replica,一個(gè)Partition能在保證不丟失已經(jīng)commit的消息的前提下容忍f個(gè)Replica的失敗。在大多數(shù)使用場(chǎng)景中,這種模式是非常有利的。事實(shí)上,為了容忍f個(gè)Replica的失敗,Majority Vote和ISR在commit前需要等待的Replica數(shù)量是一樣的,但是ISR需要的總的Replica的個(gè)數(shù)幾乎是Majority Vote的一半。
雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優(yōu)勢(shì),但是Kafka作者認(rèn)為Kafka可以通過Producer選擇是否被commit阻塞來改善這一問題,并且節(jié)省下來的Replica和磁盤使得ISR模式仍然值得。
2.2.4 如何處理所有Replica都不工作
在ISR中至少有一個(gè)follower時(shí),Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某個(gè)Partition的所有Replica都宕機(jī)了,就無法保證數(shù)據(jù)不丟失了。這種情況下有兩種可行的方案:
1.等待ISR中的任一個(gè)Replica“活”過來,并且選它作為Leader
2.選擇第一個(gè)“活”過來的Replica(不一定是ISR中的)作為Leader
這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時(shí)間就可能會(huì)相對(duì)較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數(shù)據(jù)都丟失了,這個(gè)Partition將永遠(yuǎn)不可用。選擇第一個(gè)“活”過來的Replica作為Leader,而這個(gè)Replica不是ISR中的Replica,那即使它并不保證已經(jīng)包含了所有已commit的消息,它也會(huì)成為Leader而作為consumer的數(shù)據(jù)源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據(jù)Kafka的文檔,在以后的版本中,Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場(chǎng)景選擇高可用性還是強(qiáng)一致性。
2.2.5 選舉Leader
最簡(jiǎn)單最直觀的方案是,所有Follower都在ZooKeeper上設(shè)置一個(gè)Watch,一旦Leader宕機(jī),其對(duì)應(yīng)的ephemeral znode會(huì)自動(dòng)刪除,此時(shí)所有Follower都嘗試創(chuàng)建該節(jié)點(diǎn),而創(chuàng)建成功者(ZooKeeper保證只有一個(gè)能創(chuàng)建成功)即是新的Leader,其它Replica即為Follower。
但是該方法會(huì)有3個(gè)問題:
1.split-brain 這是由ZooKeeper的特性引起的,雖然ZooKeeper能保證所有Watch按順序觸發(fā),但并不能保證同一時(shí)刻所有Replica“看”到的狀態(tài)是一樣的,這就可能造成不同Replica的響應(yīng)不一致
2.herd effect 如果宕機(jī)的那個(gè)Broker上的Partition比較多,會(huì)造成多個(gè)Watch被觸發(fā),造成集群內(nèi)大量的調(diào)整
3.ZooKeeper負(fù)載過重 每個(gè)Replica都要為此在ZooKeeper上注冊(cè)一個(gè)Watch,當(dāng)集群規(guī)模增加到幾千個(gè)Partition時(shí)ZooKeeper負(fù)載會(huì)過重。
Kafka 0.8.*的Leader Election方案解決了上述問題,它在所有broker中選出一個(gè)controller,所有Partition的Leader選舉都由controller決定。controller會(huì)將Leader的改變直接通過RPC的方式(比ZooKeeper Queue的方式更高效)通知需為為此作為響應(yīng)的Broker。同時(shí)controller也負(fù)責(zé)增刪Topic以及Replica的重新分配。
三、HA相關(guān)ZooKeeper結(jié)構(gòu)
3.1 admin
該目錄下znode只有在有相關(guān)操作時(shí)才會(huì)存在,操作結(jié)束時(shí)會(huì)將其刪除
/admin/reassign_partitions用于將一些Partition分配到不同的broker集合上。對(duì)于每個(gè)待重新分配的Partition,Kafka會(huì)在該znode上存儲(chǔ)其所有的Replica和相應(yīng)的Broker id。該znode由管理進(jìn)程創(chuàng)建并且一旦重新分配成功它將會(huì)被自動(dòng)移除。
3.2 broker
即/brokers/ids/[brokerId])存儲(chǔ)“活著”的broker信息。
topic注冊(cè)信息(/brokers/topics/[topic]),存儲(chǔ)該topic的所有partition的所有replica所在的broker id,第一個(gè)replica即為preferred replica,對(duì)一個(gè)給定的partition,它在同一個(gè)broker上最多只有一個(gè)replica,因此broker id可作為replica id。
3.3 controller
/controller -> int (broker id of the controller)存儲(chǔ)當(dāng)前controller的信息
/controller_epoch -> int (epoch)直接以整數(shù)形式存儲(chǔ)controller epoch,而非像其它znode一樣以JSON字符串形式存儲(chǔ)。
四、producer發(fā)布消息
4.1 寫入方式
producer 采用 push 模式將消息發(fā)布到 broker,每條消息都被 append 到 patition 中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高,保障 kafka 吞吐率)。
4.2 消息路由
producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) partition。其路由機(jī)制為:
1、 指定了 patition,則直接使用; 2、 未指定 patition 但指定 key,通過對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition 3、 patition 和 key 都未指定,使用輪詢選出一個(gè) patition。4.3 寫入流程
producer 寫入消息序列圖如下所示:
流程說明:
1、 producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點(diǎn)找到該 partition 的 leader 2、 producer 將消息發(fā)送給該 leader 3、 leader 將消息寫入本地 log 4、 followers 從 leader pull 消息,寫入本地 log 后 leader 發(fā)送 ACK 5、 leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK五、broker保存消息
5.1 存儲(chǔ)方式
物理上把 topic 分成一個(gè)或多個(gè) patition(對(duì)應(yīng) server.properties 中的 num.partitions=3 配置),每個(gè) patition 物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該 patition 的所有消息和索引文件),如下:
5.2 存儲(chǔ)策略
無論消息是否被消費(fèi),kafka 都會(huì)保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):
1、 基于時(shí)間:log.retention.hours=168 2、 基于大小:log.retention.bytes=1073741824六、Topic的創(chuàng)建和刪除
6.1 創(chuàng)建topic
創(chuàng)建 topic 的序列圖如下所示:
流程說明:
1、 controller 在 ZooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊(cè) watcher,當(dāng) topic 被創(chuàng)建,則 controller 會(huì)通過 watch 得到該 topic 的 partition/replica 分配。 2、 controller從 /brokers/ids 讀取當(dāng)前所有可用的 broker 列表,對(duì)于 set_p 中的每一個(gè) partition:2.1、 從分配給該 partition 的所有 replica(稱為AR)中任選一個(gè)可用的 broker 作為新的 leader,并將AR設(shè)置為新的 ISR 2.2、 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state 3、 controller 通過 RPC 向相關(guān)的 broker 發(fā)送 LeaderAndISRRequest。6.2 刪除topic
刪除 topic 的序列圖如下所示:
流程說明:
1、 controller 在 zooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊(cè) watcher,當(dāng) topic 被刪除,則 controller 會(huì)通過 watch 得到該 topic 的 partition/replica 分配。 2、 若 delete.topic.enable=false,結(jié)束;否則 controller 注冊(cè)在 /admin/delete_topics 上的 watch 被 fire,controller 通過回調(diào)向?qū)?yīng)的 broker 發(fā)送 StopReplicaRequest。七、broker failover
kafka broker failover 序列圖如下所示:
流程說明:
1、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點(diǎn)注冊(cè) Watcher,當(dāng) broker 宕機(jī)時(shí) zookeeper 會(huì) fire watch 2、 controller 從 /brokers/ids 節(jié)點(diǎn)讀取可用broker 3、 controller決定set_p,該集合包含宕機(jī) broker 上的所有 partition 4、 對(duì) set_p 中的每一個(gè) partition 4.1、 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點(diǎn)讀取 ISR 4.2、 決定新 leader 4.3、 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點(diǎn) 5、 通過 RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令八、controller failover
當(dāng) controller 宕機(jī)時(shí)會(huì)觸發(fā) controller failover。每個(gè) broker 都會(huì)在 zookeeper 的 "/controller" 節(jié)點(diǎn)注冊(cè) watcher,當(dāng) controller 宕機(jī)時(shí) zookeeper 中的臨時(shí)節(jié)點(diǎn)消失,所有存活的 broker 收到 fire 的通知,每個(gè) broker 都嘗試創(chuàng)建新的 controller path,只有一個(gè)競(jìng)選成功并當(dāng)選為 controller。
當(dāng)新的 controller 當(dāng)選時(shí),會(huì)觸發(fā) KafkaController.onControllerFailover 方法,在該方法中完成如下操作:
1、 讀取并增加 Controller Epoch。 2、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊(cè) watcher。 3、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊(cè) watcher。 4、 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊(cè) watcher。 5、 若 delete.topic.enable=true(默認(rèn)值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊(cè) watcher。 6、 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊(cè)Watch。 7、 初始化 ControllerContext 對(duì)象,設(shè)置當(dāng)前所有 topic,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等。 8、 啟動(dòng) replicaStateMachine 和 partitionStateMachine。 9、 將 brokerState 狀態(tài)設(shè)置為 RunningAsController。 10、 將每個(gè) partition 的 Leadership 信息發(fā)送給所有“活”著的 broker。 11、 若 auto.leader.rebalance.enable=true(默認(rèn)值是true),則啟動(dòng) partition-rebalance 線程。 12、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應(yīng)的Topic。?
轉(zhuǎn)載于:https://www.cnblogs.com/qingyunzong/p/9004703.html
與50位技術(shù)專家面對(duì)面20年技術(shù)見證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Kafka学习之路 (三)Kafka的高可用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis的7个应用场景
- 下一篇: 最大并发连接数和最大会话数的区别