日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka 延时消息处理

發布時間:2024/3/13 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 延时消息处理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ? 你一定遇到過這種情況,接收到消息時并不符合馬上處理的條件(例如頻率限制),但是又不能丟掉,于是先存起來,過一陣子再來處理。系統應該怎么設計呢?可能你會想到數據庫,用一個字段來標記執行的狀態,或者設置一個等待的時間戳,不管是哪種都需要反復地從數據庫存取,還要考慮出異常情況狀態的維護。

? ? 作為一款優秀的消息處理服務,kafka 具有完善的事務管理,狀態管理和災難恢復功能。只要我們稍加變通一下,kafka 也能作為延遲消息處理的解決方案,而且實現上比用數據庫簡單得多。

? ? 以下代碼均在 spring-boot 2.0.5 和 spring-kafka 2.1.10 中測試通過。建議事先閱讀文檔?https://docs.spring.io/spring-kafka/docs/2.5.4.RELEASE/reference/html/#receiving-messages?以便能很好地理解以下內容。

設計思路

設計 2 個隊列(topic),一個收到消息馬上執行,另一個用來接收需延遲處理的消息。話句話說,接收延遲消息的隊列直到消息可執行之前一直在 block 狀態,所以有局限性,定時不能非常精確,并且任務執行次序與加進來的次序是一致的。

spring-boot 的配置

application.yml ————————————————————spring:## kafkakafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: myGroupauto-offset-reset: earliestenable-auto-commit: falseproperties:max:poll:interval:# 設置時間必須比延遲處理的時間大,不然會報錯ms: 1200000listener:# 把提交模式改為手動ack-mode: MANUAL kafka 默認的消費模式是自動提交,意思是,當?MessageListener 收到消息,執行處理方法后自動提交已完成狀態,該消息就從隊列里移除了。配置 ack-mode: MANUAL 改為手動提交后,我們就可以根據需要保留數據在消息隊列,以便以后再處理。 max.poll.interval.ms 設小了可能會收到下面的錯誤: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 請務必設置一個比等待執行時間更長的時間。

發送消息

@Autowired private KafkaTemplate kafkaTemplate;public void myAction(){// 定義 data// 任務推送到 KafkakafkaTemplate.send(“myJob", data.toString()); }

該部分沒有特別的地方,跟普通的消息消息發送一樣。

接收消息

定義兩個 topic:myJob 和 myJob-delay @SpringBootApplication @ServletComponentScan public class Application {@KafkaListener(topics = “myJob”)@SendTo(“myJob-delay")public String onMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack) {String json = (String) cr.value();JSONObject data = JSON.parseObject(json);if (/* 需要延遲處理 */){// 提交ack.acknowledge();// 發送到 @SendTodata.put("until", System.currentTimeMillis() + msToDelay);return data.toString();}// 正常處理// do real work// 提交ack.acknowledge();return null;}@KafkaListener(topics = “myJob-delay")@SendTo(“myJob")public String delayMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack){String json = (String) cr.value();JSONObject data = JSON.parseObject(json);Long until = data.getLong("until");// 阻塞直到 untilwhile (System.currentTimeMillis() < until){Thread.sleep( Math.max(0, until - System.currentTimeMillis()) );}// 提交ack.acknowledge();// 轉移到 @SendToreturn json;} }

代碼很簡單,不用解釋也能看明白。稍微提一下幾個重要的地方。

@KafkaListener 的方法參數里有?Acknowledgment ack,這是AckMode.MANUAL 模式下必須要添加的參數。

ack.acknowledge() 用來標記一條消息已經消費完成,即將從消息隊列里移除。執行之前消息會一直保留在隊列中,即時宕機重啟后也能恢復。

@SendTo 用來在隊列(topic)間轉移消息,只要 return 非 null 的數據。以上代碼中,當需要延遲處理時,消息從 myJob 轉移到 myJob-delay;而當條件滿足時,消息又從 myJob-delay 轉移到了 myJob。

自從 spring-kafka 2.2.4 版本之后,可以在方法上定義?max.poll.interval.ms ,更加靈活了。例如

@KafkaListener(topics = "myTopic", groupId = "group", properties = { "max.poll.interval.ms:60000”, ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100”} )

? ? 以上是延遲消息處理的簡單實現,適合延時要求不那么高的場合。朋友們想一下,假如延時比較復雜,執行的次序也不一定跟消息到達的次序一致,系統又該怎樣設計呢?

假如這篇文章對你有所幫助, 請關注我公眾號, 發現更多有用的文章

?

總結

以上是生活随笔為你收集整理的kafka 延时消息处理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。