日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

快手二面:引入RabbitMQ后,你如何保证全链路数据100%不丢失?

發(fā)布時間:2025/3/20 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 快手二面:引入RabbitMQ后,你如何保证全链路数据100%不丢失? 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

今日推薦

扔掉 Postman,一個工具全部搞定,真香!為啥查詢那么慢?還在直接用JWT做鑒權(quán)?JJWT真香推薦 15 款常用開發(fā)工具干掉 navicat:這款 DB 管理工具才是y(永)y(遠)d(的)s(神)

來源:blog.csdn.net/hsz2568952354/

article/details/86559470

我們都知道,消息從生產(chǎn)端到消費端消費要經(jīng)過3個步驟:

  • 生產(chǎn)端發(fā)送消息到RabbitMQ;

  • RabbitMQ發(fā)送消息到消費端;

  • 消費端消費這條消息;

  • 這3個步驟中的每一步都有可能導致消息丟失,消息丟失不可怕,可怕的是丟失了我們還不知道,所以要有一些措施來保證系統(tǒng)的可靠性。這里的可靠并不是一定就100%不丟失了,磁盤損壞,機房爆炸等等都能導致數(shù)據(jù)丟失,當然這種都是極小概率發(fā)生,能做到99.999999%消息不丟失,就是可靠的了。下面來具體分析一下問題以及解決方案。

    生產(chǎn)端可靠性投遞

    生產(chǎn)端可靠性投遞,即生產(chǎn)端要確保將消息正確投遞到RabbitMQ中。生產(chǎn)端投遞的消息丟失的原因有很多,比如消息在網(wǎng)絡傳輸?shù)倪^程中發(fā)生網(wǎng)絡故障消息丟失,或者消息投遞到RabbitMQ時RabbitMQ掛了,那消息也可能丟失,而我們根本不知道發(fā)生了什么。針對以上情況,RabbitMQ本身提供了一些機制。

    事務消息機制

    事務消息機制由于會嚴重降低性能,所以一般不采用這種方法,我就不介紹了,而采用另一種輕量級的解決方案——confirm消息確認機制。

    confirm消息確認機制

    什么是confirm消息確認機制?顧名思義,就是生產(chǎn)端投遞的消息一旦投遞到RabbitMQ后,RabbitMQ就會發(fā)送一個確認消息給生產(chǎn)端,讓生產(chǎn)端知道我已經(jīng)收到消息了,否則這條消息就可能已經(jīng)丟失了,需要生產(chǎn)端重新發(fā)送消息了。

    通過下面這句代碼來開啟確認模式:

    channel.confirmSelect();//?開啟發(fā)送方確認模式

    然后異步監(jiān)聽確認和未確認的消息:

    channel.addConfirmListener(new?ConfirmListener()?{//消息正確到達broker@Overridepublic?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{System.out.println("已收到消息");//做一些其他處理}//RabbitMQ因為自身內(nèi)部錯誤導致消息丟失,就會發(fā)送一條nack消息@Overridepublic?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{System.out.println("未確認消息,標識:"?+?deliveryTag);//做一些其他處理,比如消息重發(fā)等} });

    這樣就可以讓生產(chǎn)端感知到消息是否投遞到RabbitMQ中了,當然這樣還不夠,稍后我會說一下極端情況。

    消息持久化

    那消息持久化呢?我們知道,RabbitMQ收到消息后將這個消息暫時存在了內(nèi)存中,那這就會有個問題,如果RabbitMQ掛了,那重啟后數(shù)據(jù)就丟失了,所以相關的數(shù)據(jù)應該持久化到硬盤中,這樣就算RabbitMQ重啟后也可以到硬盤中取數(shù)據(jù)恢復。那如何持久化呢?

    message消息到達RabbitMQ后先是到exchange交換機中,然后路由給queue隊列,最后發(fā)送給消費端。

    所有需要給exchange、queue和message都進行持久化:

    exchange持久化:

    //第三個參數(shù)true表示這個exchange持久化 channel.exchangeDeclare(EXCHANGE_NAME,?"direct",?true);

    queue持久化:

    //第二個參數(shù)true表示這個queue持久化 channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);

    message持久化:

    //第三個參數(shù)MessageProperties.PERSISTENT_TEXT_PLAIN表示這條消息持久化 channel.basicPublish(EXCHANGE_NAME,?ROUTING_KEY,?MessageProperties.PERSISTENT_TEXT_PLAIN,?message.getBytes(StandardCharsets.UTF_8));

    這樣,如果RabbitMQ收到消息后掛了,重啟后會自行恢復消息。

    到此,RabbitMQ提供的幾種機制都介紹完了,但這樣還不足以保證消息可靠性投遞RabbitMQ中,上面我也提到了會有極端情況,比如RabbitMQ收到消息還沒來得及將消息持久化到硬盤時,RabbitMQ掛了,這樣消息還是丟失了,或者RabbitMQ在發(fā)送確認消息給生產(chǎn)端的過程中,由于網(wǎng)絡故障而導致生產(chǎn)端沒有收到確認消息,這樣生產(chǎn)端就不知道RabbitMQ到底有沒有收到消息,就不好做接下來的處理。

    所以除了RabbitMQ提供的一些機制外,我們自己也要做一些消息補償機制,以應對一些極端情況。接下來我就介紹其中的一種解決方案——消息入庫。

    消息入庫

    消息入庫,顧名思義就是將要發(fā)送的消息保存到數(shù)據(jù)庫中。

    首先發(fā)送消息前先將消息保存到數(shù)據(jù)庫中,有一個狀態(tài)字段status=0,表示生產(chǎn)端將消息發(fā)送給了RabbitMQ但還沒收到確認;在生產(chǎn)端收到確認后將status設為1,表示RabbitMQ已收到消息。這里有可能會出現(xiàn)上面說的兩種情況,所以生產(chǎn)端這邊開一個定時器,定時檢索消息表,將status=0并且超過固定時間后(可能消息剛發(fā)出去還沒來得及確認這邊定時器剛好檢索到這條status=0的消息,所以給個時間)還沒收到確認的消息取出重發(fā)(第二種情況下這里會造成消息重復,消費者端要做冪等性),可能重發(fā)還會失敗,所以可以做一個最大重發(fā)次數(shù),超過就做另外的處理。

    這樣消息就可以可靠性投遞到RabbitMQ中了,而生產(chǎn)端也可以感知到了。

    消費端消息不丟失

    既然已經(jīng)可以讓生產(chǎn)端100%可靠性投遞到RabbitMQ了,那接下來就改看看消費端的了,如何讓消費端不丟失消息。

    默認情況下,以下3種情況會導致消息丟失:

    • 在RabbitMQ將消息發(fā)出后,消費端還沒接收到消息之前,發(fā)生網(wǎng)絡故障,消費端與RabbitMQ斷開連接,此時消息會丟失;

    • 在RabbitMQ將消息發(fā)出后,消費端還沒接收到消息之前,消費端掛了,此時消息會丟失;

    • 消費端正確接收到消息,但在處理消息的過程中發(fā)生異常或宕機了,消息也會丟失。

    其實,上述3中情況導致消息丟失歸根結(jié)底是因為RabbitMQ的自動ack機制,即默認RabbitMQ在消息發(fā)出后就立即將這條消息刪除,而不管消費端是否接收到,是否處理完,導致消費端消息丟失時RabbitMQ自己又沒有這條消息了。

    所以就需要將自動ack機制改為手動ack機制。

    消費端手動確認消息:

    DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{try?{//接收到消息,做處理//手動確認channel.basicAck(delivery.getEnvelope().getDeliveryTag(),?false);}?catch?(Exception?e)?{//出錯處理,這里可以讓消息重回隊列重新發(fā)送或直接丟棄消息} }; //第二個參數(shù)autoAck設為false表示關閉自動確認機制,需手動確認 channel.basicConsume(QUEUE_NAME,?false,?deliverCallback,?consumerTag?->?{});

    這樣,當autoAck參數(shù)置為false,對于RabbitMQ服務端而言,隊列中的消息分成了兩個部分:一部分是等待投遞給消費端的消息;一部分是已經(jīng)投遞給消費端,但是還沒有收到消費端確認信號的消息。如果RabbitMQ一直沒有收到消費端的確認信號,并且消費此消息的消費端已經(jīng)斷開連接或宕機(RabbitMQ會自己感知到),則RabbitMQ會安排該消息重新進入隊列(放在隊列頭部),等待投遞給下一個消費者,當然也有能還是原來的那個消費端,當然消費端也需要確保冪等性。

    好了,到此從生產(chǎn)端到RabbitMQ再到消費端的全鏈路,就可以保證數(shù)據(jù)的不丟失。

    由于個人水平有限,有些地方可能理解錯了或理解不到位的,請大家多多指出!Thanks

    推薦文章1、一款高顏值的 SpringBoot+JPA 博客項目2、超優(yōu) Vue+Element+Spring 中后端解決方案3、推薦幾個支付項目!4、推薦一個 Java 企業(yè)信息化系統(tǒng)5、一款基于 Spring Boot 的現(xiàn)代化社區(qū)(論壇/問答/社交網(wǎng)絡/博客)

    總結(jié)

    以上是生活随笔為你收集整理的快手二面:引入RabbitMQ后,你如何保证全链路数据100%不丢失?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。