rabbitmq如何保证消息不被重复消费_RabbitMQ保证消息可靠投递与消费的正确使用姿势...
前言
MQ 是什么?MQ 我們可以理解為消息隊列。
隊列是什么?隊列我們可以理解為管道。
即以管道的方式做消息傳遞。
場景展示:
1.我們在雙11的凌晨大量秒殺和搶購商品,然后去結算的時候,發現界面會通過圖片或文字的方式提醒我們,讓我們稍等。而不是動不動就頁面卡死、報錯。
在這個業務場景中,我們就可以采用隊列的機制來處理,因為同時結算就只能達到這么多。
2.平時的超市中購物也是一樣,當我們在結算的時候,并不會一窩蜂一樣涌入收銀臺,而是排隊結算。這也是隊列機制。
綜上所述:就是排隊。一個接著一個的處理,不能插隊。
優秀隊列管理專家:RabbitMQ
RabbitMQ的是由Erlang語言編寫的實現了高級消息隊列協議(AMQP)的開源消息代理軟件(也可稱為:面向消息的中間件)。
支持Windows、Linux/Unix、MAC OS X操作系統和包括JAVA在內的多種編程語言。同時支持大量協議,如AMQP、XMPP、SMTP、STOMP。并且支持消息的持久化,負載均衡和集群。另外支持消息確認機制,如事務、異步確認等。本文重點闡述RabbitMQ的消息確認機制。
留住每一條消息
因為服務器的異常崩潰導致的消息丟失?
不存在的。
RabbitMQ的消息持久化可以讓我們不丟失任何消息。
萬一消息在到達broker之前已經丟失怎么辦?
我們還有辦法!
如果不進行特殊配置的話,默認情況下發布操作是不會返回任何信息給生產者的,也就是默認情況下我們的生產者并不知道消息有沒有正確到達broker。這個問題該怎么解決?
RabbitMQ為我們提供了兩種方式,第一,通過AMQP事務機制實現,這也是AMQP協議層面提供的解決方案;第二,通過將channel設置成confirm模式來實現。
事務機制:
該模式與數據庫的事務非常相似。RabbitMQ中與事務機制有關的方法有txSelect(), txCommit()以及txRollback()。txSelect用于將當前channel設置成transaction模式,txCommit用于提交事務,txRollback用于回滾事務。在通過txSelect開啟事務之后,我們便可以發布消息給broker了,如果txCommit提交成功了,則消息一定到達了broker了,如果在txCommit執行之前broker異常崩潰或者由于其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了。示例代碼如下:
該模式用法簡單,但是有個致命的缺點,那就是事務提交非常慢,會嚴重降低系統吞吐量,所以一般不推薦使用該模式,而改用confirm模式。
Confirm模式
Productor Confirm模式:
生產者將channel設置成confirm模式,一旦channel進入confirm模式,所有在該channel上面發布的消息都會被指派一個唯一的ID(Correlation Id從1開始)。一旦消息被投遞到所匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID)。這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會將消息寫入磁盤之后發出。broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也可以設置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經得到了處理。
confirm模式最大的好處在于他是異步的,一旦發布一條消息,生產者應用程序就可以在等channel返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息。
在channel 被設置成 confirm 模式之后,所有被 publish 的后續消息都將被 confirm(即 ack) 或者被nack一次。但是沒有對消息被 confirm 的快慢做任何保證,并且同一條消息不會既被 confirm又被nack (注:已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的)。示例代碼如下:
Consumer Confirm模式:
如果處理一條消息需要幾秒鐘的時間,你可能會想,如果在處理消息的過程中,消費者服務器、網絡、網卡出現故障掛了,那可能這條正在處理的消息或者任務就沒有完成,就會失去這個消息和任務。為了確保消息或者任務不會丟失,RabbitMQ支持消息確認–ACK。ACK機制是消費端從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除。如果一個消費者在處理消息時掛掉(網絡不穩定、服務器異常、網站故障等原因導致頻道、連接關閉或者TCP連接丟失等),那么他就不會有ACK反饋,RabbitMQ會認為這個消息沒有正常消費,會將此消息重新放入隊列中。如果有其他消費者同時在線,RabbitMQ會立即將這個消息推送給這個在線的消費者。這種機制保證了在消費者服務器故障的時候,能不丟失任何消息和任務。
如果RabbitMQ向消費者發送消息時,消費者服務器掛了,消息也不會有超時;即使一個消息需要非常長的時間處理,也不會導致消息超時。這樣消息永遠不會從RabbitMQ服務器中刪除。只有當消費者正確的發送ACK確認反饋,RabbitMQ確認收到后,消息才會從RabbitMQ服務器的數據中刪除。
消息的ACK確認機制默認是開啟的。ACK分為自動(auto)和手動(manul)2種模式:
自動確認模式,消息被認為是在發送后立即成功發送的,這種模式可以實現更高的吞吐量,但卻是不安全的。如果在成功發送之前,消費者的TCP連接或通道關閉,服務器發送的消息將丟失。在使用自動確認模式時,需要考慮的另一件事是消費者過載。示例代碼如下:
手動確認模式可以使用 prefetch,限制通道上未完成的(“正在進行中的”)發送的數量。然而,在自動確認的情況下,沒有這樣的限制。示例代碼如下:
忘記確認
忘記通過basicAck返回確認信息是常見的錯誤。這個錯誤非常嚴重,將導致消費者客戶端退出或者關閉后,消息會被退回RabbitMQ服務器,這會使RabbitMQ服務器內存爆滿,而且RabbitMQ也不會主動刪除這些被退回的消息。如果要監控這種錯誤,可以使用rabbitmqctl messages_unacknowledged命令打印出出相關的信息。
當autoAck參數置為false,對于RabbitMQ服務端而言,隊列中的消息分成了兩個部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,但是還沒有收到消費者ack的消息。如果RabbitMQ一直沒有收到消費者的確認信號,并且消費此消息的消費者已經斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。
RabbitMQ不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否已經斷開,這么設計的原因是RabbitMQ允許消費者消費一條消息的時間可以很久很久。
如果消息消費失敗,也可以調用Basic.Reject或者Basic.Nack來拒絕當前消息而不是確認,如果只是簡單的拒絕那么消息會丟失,需要將相應的requeue參數設置為true,那么RabbitMQ會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者。如果requeue參數設置為false的話,RabbitMQ立即會把消息從隊列中移除,而不會把它發送給新的消費者。示例代碼如下:
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的rabbitmq如何保证消息不被重复消费_RabbitMQ保证消息可靠投递与消费的正确使用姿势...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 读书郎平板电脑实用吗
- 下一篇: socket 收不到netty客户端消息