日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka丢数据问题方案(转载+整理+汇总)

發布時間:2023/12/31 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka丢数据问题方案(转载+整理+汇总) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

丟數據可能來自:

息生產者(Producer)、消息消費者(Consumer)和服務載體(在Kafka中用Broker指代)[6]

broker可以理解為類似于隊列api一樣的東西

[8]提供了一個腦圖

?

----------------------------------------------------------------------------------生產者--------------------------------------------------------------------------------------------------------------------------------------------------

參數設置推薦設置
request.required.acks-1
producer.send(msg,callback)

優先使用producer.send(msg,callback) 帶回調方法

而不是producer.send(msg) 不帶回調方法

retriesMAX
enable.idempotence

true

producer.type采用默認的sync值[5]

?

?

-------------------------------------------------------------------------------服務端---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

變量說明推薦設置
unclean.leader.election.enable這個參數是控制leader replica出問題了以后follower replica競選leader replica資格的,我們把設置為false,意思就是如果follower replica如果落后leader replica太多就不能參與競選。false
replication.factor這個參數設置的是partition副本的個數,如果我們要想保證數據不丟,這個副本數需要設置成大于1。>= 3
min.insync.replicas

這個參數要跟生產者里的acks參數配合使用,當生產者acks=-1時,服務端的ISR列表里的所有副本都寫入成功,才會給生產者返回成功的響應。

?

而min.insync.replicas這個參數就是控制ISR列表的,假設min.insync.replicas=1,這就意味著ISR列表里可以只有一個副本,這個副本就是leader replica,這個時候即使acks設置的是-1,但其實消息只發送到leader replica,以后就返回成功的響應了。

?

因為ISR只有一個副本,我們知道這種情況是有可能會丟數據的,所以min.insync.replicas這個值需要大于1的(如果ISR列表里面副本的個數小于min.insync.replicas,生產者發送消息是失敗的),并且是min.insync.replicas <= replication.factor

1<min.insync.replicas

<=replication.factor

interval.messages=10000

當達到下面的時間(ms)時,執行一次強制的flush操作。interval.ms和interval.messages無論哪個達到,都會flush。默認3000ms
log.flush.interval.ms=1000

檢查是否需要將日志flush的時間間隔
log.flush.scheduler.interval.ms = 3000

?

設置好消息重試機制

?

下面根據[11][12]

函數作用對應的設置[12]
cleanupExpiredSegments負責清理過期的日志數據

log.retention.hours

log.retention.minutes

log.retention.ms

cleanupSegmentsToMaintainSize負責清理超過指定容量的日志數據log.retention.bytes

?

---------------------------------------------------------------------------------------------------------------------消費者------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

變量推薦設置
enable.auto.commitfalse

?

?

?

Reference:

[1]kafka中的ISR、AR又代表什么?ISR伸縮又是什么?

[2]Kafka零數據丟失的配置方案

[3]Kafka丟失數據問題優化總結(需要后面再看下)

[4]一步步解決spring-kafka消息丟失(提了下spring版本和kafka驅動版本的匹配log.retention.bytes性)

[5]kafka消息會不會丟失?為什么?看了這個你就清楚了

[6]Kafka源碼分析及圖解原理之Broker端

[7]kafka消息丟失情況與解決方案

[8]面試官問:Kafka 會不會丟消息?怎么處理的?

[9]Kafka 消息丟失與消費精確一次性

[10]kafka如何防止數據丟失(從生產者/服務器/消費者角度)

[11]Kafka 異常丟失大量數據的解決方案

[12]Kafka清除數據日志詳解

總結

以上是生活随笔為你收集整理的kafka丢数据问题方案(转载+整理+汇总)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。