日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

说说 RabbiMQ 的应答模式

發布時間:2023/12/4 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 说说 RabbiMQ 的应答模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RabbiMQ 我們都很熟悉了,是很常用的一個開源消息隊列。搞懂 RabbiMQ 的應答模式對我們排查錯誤很有幫助,也能避免一些坑。本文說說 RabbiMQ 的應答模式。

生產者發出一條消息給 RabbiMQ ,RabbiMQ 將消息推送給消費者,消費者處理完消息后告訴 RabbiMQ,我已經接收到消息并處理了,RabbiMQ 收到通知后會將消息從隊列中刪除。消費者通知 MQ 的這個過程就是消息的應答。在 RabbiMQ 中有兩種應答模式:自動應答和手動應答。

版本

  • dotNET Core :3.1

  • RabbitMQ:3.8.2

  • RabbitMQ.Client:6.2.1

自動應答

當 RabbiMQ 開啟了消息的自動應答,一旦 RabbiMQ 將消息分發給了消費者,就會將消息從內存中刪除。這種情況下,如果正在執行的消費者掛掉,就會丟失正在處理的消息。

生產者代碼

static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("RabbitMQ連接成功,請輸入消息,輸入exit退出");channel.QueueDeclare("oec2003",?false,?false,?false,?null);string?input;do{input?=?Console.ReadLine();var?body?=?Encoding.UTF8.GetBytes(input);channel.BasicPublish("",?"oec2003",?null,?body);}while?(input.Trim().ToLower()?!=?"exit");} }

消費者代碼

static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("消費者開始監聽......");channel.QueueDeclare("oec2003",?false,?false,?false,?null);EventingBasicConsumer?consumer?=?new?EventingBasicConsumer(channel);consumer.Received?+=?(ch,?ea)?=>{string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");System.Threading.Thread.Sleep(10000);};channel.BasicConsume("oec2003",?true,?consumer);Console.ReadKey();} }
  • channel.BasicConsume 方法的第二個參數設置為 true 表示自動應答;

  • 開啟自動應答后,消息是生產者發布后,當有消費者連接上后,所有的消息都會被自動確認,并且從內存中刪除,這時如果消費者進程掛掉,沒有處理的消息會丟失,正在處理中的消息也不會被重新投遞;

  • 自動應答的好處是消息隊列不會處于堵塞狀態,但代價有點大,生產環境中還是不建議使用。

手動應答

手動應答,當消費者接收到消息處理完后,需要發送一個回執,告訴 RabbiMQ 服務端,這時 RabbiMQ 才會將該消息刪除。

生產者的代碼和上面的一樣,消費者代碼需要做相關調整,如下:

static?void?Main(string[]?args) {ConnectionFactory?factory?=?new?ConnectionFactory{UserName?=?"oec2003",Password?=?"000000",HostName?=?"10.211.55.6"};using?(var?connection?=?factory.CreateConnection())using?(var?channel?=?connection.CreateModel()){Console.WriteLine("消費者開始監聽......");EventingBasicConsumer?consumer?=?new?EventingBasicConsumer(channel);consumer.Received?+=?(ch,?ea)?=>{string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");channel.BasicAck(ea.DeliveryTag,?false);};channel.BasicConsume("oec2003",?false,?consumer);Console.ReadKey();} }
  • channel.BasicConsume 方法的第二個參數設置為 false ,表示手動應答模式;

  • 在處理完消息后調用 channel.BasicAck(ea.DeliveryTag, false); 來進行應答,告訴 RabbiMQ 消息已經收到,RabbiMQ 收到這個回執后,才會刪除消息。

可能遇到的問題

流量控制問題

在手動模式下,生產者發送消息后消息會從 Ready 進入到 Unacked 中,當消費者進行應答之后消息從 Unacked 中刪除。

如果消息的產生速度遠遠大于消費者的處理速度,這時消息就會都在消費者處進行積壓了。我們會看到 Unacked 中的數量會越來越大,這樣消費者的壓力就會越來越大,這時就需要使用 Qos 來進行限流。

Qos

在消費者中使用 channel.BasicQos(0, 2, false); 來進行 Qos 的設置,如下圖:

BasicQos 方法有三個參數:

  • prefetchSize:批量獲取消息的總大小,0為不限制;

  • prefetchCount:每次處理消息的個數,比如 prefetchCount 設置為 2 ,那么處于 Unacked 狀態的消息最多就 2 條,當其中一條進行了得到了應答后,才會從 Ready 中轉入一條到 Unacked

  • global:設置為 true 表示對 channel 進行控制,否則對每個消費者進行限制,一個 channel 可以有多個消費者

為什么使用 Qos :

  • 提高服務穩定性,因為有 prefetchCount 參數的控制,不會有海量的數據涌進來導致消費者服務掛掉;

  • 提高吞吐量,當隊列有多個消費者時,每個消費者的能力不一樣,我們可以通過 prefetchCount 參數來合理安排每個消費者的處理能力,不會出現有的空閑,有的積壓。

prefetchCount 是一個非常關鍵的參數,當消費者處理消息時,出現一些異常情況,導致無法進行 Ack 應答,沒有應答的數量大于等于 prefetchCount 時,隊列就會發生堵塞。所以我們一定要確保消息的處理能夠被異常捕獲,并在 finally 中進行 Ack 應答,代碼如下:

try {string?message?=?Encoding.Default.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");if?(message?==?"error"){throw?new?Exception("mq?error");}else?if?(message?==?"sleep"){System.Threading.Thread.Sleep(60000);} } catch?(Exception) {//處理異常 } finally {channel.BasicAck(ea.DeliveryTag,?false); }

一旦隊列堵塞了,一種處理方式就是斷掉客戶端,這樣,處在 Unacked 中的消息會重新回到 Ready 中,會重新進行投遞進行消費。

總結

1、自動應答模式需要慎用,特別是生產環境;

2、不開啟 Qos ,消費者可能會面臨很大壓力,但消息不會堵塞(測試過 500 個未進行 Ack 沒有造成堵塞),現在不確定在沒有 Qos 的情況下,有沒有默認的最大 prefetchCount ;

3、開啟 Qos ,prefetchCount 的值很關鍵,并且需要做好異常處理,防止堵塞。

希望本文對您有所幫助!

總結

以上是生活随笔為你收集整理的说说 RabbiMQ 的应答模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。