日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka重复消费问题

發布時間:2023/12/15 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka重复消费问题 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

開篇提示:kafka重復消費的根本原因就是“數據消費了,但是offset沒更新!而我們要探究一般什么情況下會導致offset沒更新?

今天查看Elasticsearch索引的時候發現有一個索引莫名的多了20w+的數據,頓時心里一陣驚訝,然后趕緊打開訂閱服務的日志(消費者),眼前的一幕讓我驚呆了,我的消費服務的控制臺一直在不斷的刷著消費日志(剛開始我并沒有意識到這是重復消費造成的),我還傻傻的以為是因為今天有人在刷單,所以導致日志狂刷,畢竟之前也遇到過有人用自動交易軟件瘋狂刷單的,所以當時也沒在意;等過了幾分鐘,又去瞅了一眼控制臺仍然在瘋狂的刷著日志,媽呀!頓時隱隱感覺不對勁,趕緊看了一眼es索引,我滴天一下子多了幾萬的數據,突然在想是不是程序出問題了(因為頭一天晚上發了一個版本),然后就開始死盯這日志看,發現了一個奇葩的問題:tmd怎么日志打印的數據都是重復的呀!這才恍然大悟,不用想了絕逼是kakfa重復消費了,好吧!能有什么辦法了,開始瘋狂的尋找解決的辦法......

既然之前沒有問題,那就是我昨天發版所導致的,那么我昨天究竟改了什么配置呢?對照了之前的版本比較了一下,發現這個參數enable-auto-commit被改成了true,即自動提交,理論上在數據并發不大,以及數據處理不耗時的情況下設置自動提交是沒有什么問題的,但是我的情況恰恰相反,可能突然會并發很大(畢竟交易流水不好說的),所以可能在規定的時間(session.time.out默認30s)內沒有消費完,就會可能導致re-blance重平衡,導致一部分offset自動提交失敗,然后重平衡后重復消費(這種很常見);或者關閉kafka時,如果在close之前,調用consumer.unsubscribe()則可能有部分offset沒提交,下次重啟會重復消費

try {
consumer.unsubscribe();
} catch (Exception e) {
}

try {
consumer.close();
} catch (Exception e) {
}

?

所以一般情況下我們設置offset自動提交為false!

解決方法:

1.設置

spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest

2.就是修改offset為最新的偏移量唄!我們都知道offset是存在zookeeper中的,所以我就不贅述了!

我的解決方法:

我并沒有去修改offset偏移量,畢竟生產環境還是不直接改這個了;

我重新指定了一個消費組(group.id=order_consumer_group),然后指定auto-offset-reset=latest這樣我就只需要重啟我的服務了,而不需要動kafka和zookeeper了!

?

#consumer spring.kafka.consumer.group-id=order_consumer_group spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest

注:如果你想要消費者從頭開始消費某個topic的全量數據,可以重新指定一個全新的group.id=new_group,然后指定auto-offset-reset=earliest即可

?

補充:

在kafka0.9.0版本的時候,開始啟用了新的consumer config,這個新的consumer config采用bootstrap.servers替代之前版本的zookeeper.connect,主要是要漸漸弱化zk的依賴,把zk依賴隱藏到broker背后。
group coordinator
使用bootstrap.servers替代之前版本的zookeeper.connect,相關的有如下兩個改動:

1.在 Server 端增加了 GroupCoordinator 這個角色 2.將 topic 的 offset 信息由之前存儲在 zookeeper(/consumers/<group.id>/offsets/<topic>/<partitionId>,zk寫操作性能不高) 上改為存儲到一個特殊的 topic 中(__consumer_offsets)

從0.8.2版本開始Kafka開始支持將consumer的位移信息保存在Kafka內部的topic中(從0.9.0版本開始默認將offset存儲到系統topic中)
Coordinator一般指的是運行在broker上的group Coordinator,用于管理Consumer Group中各個成員,每個KafkaServer都有一個GroupCoordinator實例,管理多個消費者組,主要用于offset位移管理和Consumer Rebalance。
rebalance時機
在如下條件下,partition要在consumer中重新分配:

條件1:有新的consumer加入 條件2:舊的consumer掛了 條件3:coordinator掛了,集群選舉出新的coordinator 條件4:topic的partition新加 條件5:consumer調用unsubscrible(),取消topic的訂閱

__consumer_offsets
Consumer通過發送OffsetCommitRequest請求到指定broker(偏移量管理者)提交偏移量。這個請求中包含一系列分區以及在這些分區中的消費位置(偏移量)。偏移量管理者會追加鍵值(key-value)形式的消息到一個指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition組成的,而value是偏移量。

?

參考:https://segmentfault.com/a/1190000011441747

總結

以上是生活随笔為你收集整理的kafka重复消费问题的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。