Kafka无消息丢失配置
Kafka到底會不會丟數(shù)據(jù)(data loss)? 通常不會,但有些情況下的確有可能會發(fā)生。下面的參數(shù)配置及Best practice列表可以較好地保證數(shù)據(jù)的持久性(當(dāng)然是trade-off,犧牲了吞吐量)。筆者會在該列表之后對列表中的每一項進(jìn)行討論,有興趣的同學(xué)可以看下后面的分析。
給出列表之后,我們從兩個方面來探討一下數(shù)據(jù)為什么會丟失:
1. Producer端
目前比較新版本的Kafka正式替換了Scala版本的old producer,使用了由Java重寫的producer。新版本的producer采用異步發(fā)送機制。KafkaProducer.send(ProducerRecord)方法僅僅是把這條消息放入一個緩存中(即RecordAccumulator,本質(zhì)上使用了隊列來緩存記錄),同時后臺的IO線程會不斷掃描該緩存區(qū),將滿足條件的消息封裝到某個batch中然后發(fā)送出去。顯然,這個過程中就有一個數(shù)據(jù)丟失的窗口:若IO線程發(fā)送之前client端掛掉了,累積在accumulator中的數(shù)據(jù)的確有可能會丟失。
Producer的另一個問題是消息的亂序問題。假設(shè)客戶端代碼依次執(zhí)行下面的語句將兩條消息發(fā)到相同的分區(qū)
producer.send(record1); producer.send(record2);如果此時由于某些原因(比如瞬時的網(wǎng)絡(luò)抖動)導(dǎo)致record1沒有成功發(fā)送,同時Kafka又配置了重試機制和max.in.flight.requests.per.connection大于1(默認(rèn)值是5,本來就是大于1的),那么重試record1成功后,record1在分區(qū)中就在record2之后,從而造成消息的亂序。很多某些要求強順序保證的場景是不允許出現(xiàn)這種情況的。
鑒于producer的這兩個問題,我們應(yīng)該如何規(guī)避呢??對于消息丟失的問題,很容易想到的一個方案就是:既然異步發(fā)送有可能丟失數(shù)據(jù), 我改成同步發(fā)送總可以吧?比如這樣:
producer.send(record).get();這樣當(dāng)然是可以的,但是性能會很差,不建議這樣使用。因此特意總結(jié)了一份配置列表。個人認(rèn)為該配置清單應(yīng)該能夠比較好地規(guī)避producer端數(shù)據(jù)丟失情況的發(fā)生:(特此說明一下,軟件配置的很多決策都是trade-off,下面的配置也不例外:應(yīng)用了這些配置,你可能會發(fā)現(xiàn)你的producer/consumer 吞吐量會下降,這是正常的,因為你換取了更高的數(shù)據(jù)安全性)
- block.on.buffer.full = true ?盡管該參數(shù)在0.9.0.0已經(jīng)被標(biāo)記為“deprecated”,但鑒于它的含義非常直觀,所以這里還是顯式設(shè)置它為true,使得producer將一直等待緩沖區(qū)直至其變?yōu)榭捎?。否則如果producer生產(chǎn)速度過快耗盡了緩沖區(qū),producer將拋出異常
- acks=all ?很好理解,所有follower都響應(yīng)了才認(rèn)為消息提交成功,即"committed"
- retries = MAX 無限重試,直到你意識到出現(xiàn)了問題:)
- max.in.flight.requests.per.connection = 1 限制客戶端在單個連接上能夠發(fā)送的未響應(yīng)請求的個數(shù)。設(shè)置此值是1表示kafka broker在響應(yīng)請求之前client不能再向同一個broker發(fā)送請求。注意:設(shè)置此參數(shù)是為了避免消息亂序
- 使用KafkaProducer.send(record, callback)而不是send(record)方法 ? 自定義回調(diào)邏輯處理消息發(fā)送失敗
- callback邏輯中最好顯式關(guān)閉producer:close(0)?注意:設(shè)置此參數(shù)是為了避免消息亂序
- unclean.leader.election.enable=false ? 關(guān)閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免數(shù)據(jù)丟失
- replication.factor >= 3 ? 這個完全是個人建議了,參考了Hadoop及業(yè)界通用的三備份原則
- min.insync.replicas > 1 消息至少要被寫入到這么多副本才算成功,也是提升數(shù)據(jù)持久性的一個參數(shù)。與acks配合使用
- 保證replication.factor >?min.insync.replicas ?如果兩者相等,當(dāng)一個副本掛掉了分區(qū)也就沒法正常工作了。通常設(shè)置replication.factor =?min.insync.replicas + 1即可
2. Consumer端
consumer端丟失消息的情形比較簡單:如果在消息處理完成前就提交了offset,那么就有可能造成數(shù)據(jù)的丟失。由于Kafka consumer默認(rèn)是自動提交位移的,所以在后臺提交位移前一定要保證消息被正常處理了,因此不建議采用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個線程中去做。為了避免數(shù)據(jù)丟失,現(xiàn)給出兩點建議:
- enable.auto.commit=false ?關(guān)閉自動提交位移
- 在消息被完整處理之后再手動提交位移
轉(zhuǎn)載于:https://www.cnblogs.com/huxi2b/p/6056364.html
總結(jié)
以上是生活随笔為你收集整理的Kafka无消息丢失配置的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。