RabbitMQ知多少
1.引言
RabbitMQ——Rabbit Message Queue的簡寫,但不能僅僅理解其為消息隊(duì)列,消息代理更合適。RabbitMQ 是一個由 Erlang 語言開發(fā)的AMQP(高級消息隊(duì)列協(xié)議)的開源實(shí)現(xiàn),其內(nèi)部結(jié)構(gòu)如下:
RabbitMQ作為一個消息代理,主要和消息打交道,負(fù)責(zé)接收并轉(zhuǎn)發(fā)消息。RabbitMQ提供了可靠的消息機(jī)制、跟蹤機(jī)制和靈活的消息路由,支持消息集群和分布式部署。適用于排隊(duì)算法、秒殺活動、消息分發(fā)、異步處理、數(shù)據(jù)同步、處理耗時任務(wù)、CQRS等應(yīng)用場景。
下面我們就來學(xué)習(xí)下RabbitMQ。
2. 環(huán)境搭建
本文主要基于Windows下使用Vs Code 基于.net core進(jìn)行demo演示。開始之前我們需要準(zhǔn)備好以下環(huán)境。
安裝Erlang運(yùn)行環(huán)境
下載安裝Erlang。安裝RabbitMQ
下載安裝Windows版本的RabbitMQ。啟動RabbitMQ Server
點(diǎn)擊Windows開始按鈕,輸入RabbitMQ找到RabbitMQ Comman Prompt,以管理員身份運(yùn)行。依次執(zhí)行以下命令啟動RabbitMQ服務(wù)
rabbitmq-service installrabbitmq-service enablerabbitmq-service start執(zhí)行rabbitmqlctl status檢查RabbitMQ狀態(tài)
安裝管理平臺插件
執(zhí)行rabbitmq-plugins enable rabbitmq_management即可成功安裝,使用默認(rèn)賬號密碼(guest/guest)登錄http://localhost:15672/即可。
3. Hello RabbitMQ
在開始之前我們先來了解下消息模型:
消費(fèi)者(consumer)訂閱某個隊(duì)列。生產(chǎn)者(producer)創(chuàng)建消息,然后發(fā)布到隊(duì)列(queue)中,隊(duì)列再將消息發(fā)送到監(jiān)聽的消費(fèi)者。
下面我們我們通過demo來了解RabbitMQ的基本用法。
3.1.消息的發(fā)送和接收
創(chuàng)建RabbitMQ文件夾,打開命令提示符,分別創(chuàng)建兩個控制臺項(xiàng)目Send、Receive。
dotnet new console --name Send //創(chuàng)建發(fā)送端控制臺應(yīng)用cd Send //進(jìn)入Send目錄
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢復(fù)包 dotnet new console --name Receive //創(chuàng)建接收端控制臺應(yīng)用
cd Receive //進(jìn)入Receive目錄
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢復(fù)包
我們先來添加消息發(fā)送端邏輯:
//Send.cs public static void Main(string[] args) { ? ?//1.1.實(shí)例化連接工廠var factory = new ConnectionFactory() { HostName = "localhost" }; ?
//2. 建立連接using (var connection = factory.CreateConnection()){ ? ? ?
//3. 創(chuàng)建信道using (var channel = connection.CreateModel()){ ? ? ? ? ?
//4. 申明隊(duì)列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); ? ? ? ?
//5. 構(gòu)建byte消息數(shù)據(jù)包string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";var body = Encoding.UTF8.GetBytes(message); ? ? ? ?
?//6. 發(fā)送數(shù)據(jù)包channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);}} }
再來完善消息接收端邏輯:
//Receive.cs ?省略部分代碼public static void Main(){ ? ?//1.實(shí)例化連接工廠var factory = new ConnectionFactory() { HostName = "localhost" }; ?
?//2. 建立連接using (var connection = factory.CreateConnection()){ ? ? ?
??//3. 創(chuàng)建信道using (var channel = connection.CreateModel()){ ? ? ? ? ? ?
??//4. 申明隊(duì)列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); ? ? ? ? ?
???//5. 構(gòu)造消費(fèi)者實(shí)例var consumer = new EventingBasicConsumer(channel); ? ? ? ? ? ?//6. 綁定消息接收后的事件委托consumer.Received += (model, ea) =>{ ? ? ? ? ? ? ? ?var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine(" [x] Received {0}", message);Thread.Sleep(6000);//模擬耗時Console.WriteLine (" [x] Done");}; ? ? ? ? ? ?
???//7. 啟動消費(fèi)者channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}} }
先運(yùn)行消息接收端,再運(yùn)行消息發(fā)送端,結(jié)果如下圖。
從上面的代碼中可以看出,發(fā)送端和消費(fèi)端的代碼前4步都是一樣的。主要的區(qū)別在于發(fā)送端調(diào)用channel.BasicPublish方法發(fā)送消息;而接收端需要實(shí)例化一個EventingBasicConsumer實(shí)例來進(jìn)行消息處理邏輯。另外一點(diǎn)需要注意的是:消息接收端和發(fā)送端的隊(duì)列名稱(queue)必須保持一致,這里指定的隊(duì)列名稱為hello。
3.2. 循環(huán)調(diào)度
使用工作隊(duì)列的好處就是它能夠并行的處理隊(duì)列。如果堆積了很多任務(wù),我們只需要添加更多的工作者(workers)就可以了。我們先啟動兩個接收端,等待消息接收,再啟動一個發(fā)送端進(jìn)行消息發(fā)送。
我們增加運(yùn)行一個消費(fèi)端后的運(yùn)行結(jié)果:
從圖中可知,我們循環(huán)發(fā)送4條信息,兩個消息接收端按順序被循環(huán)分配。
默認(rèn)情況下,RabbitMQ將按順序?qū)⒚織l消息發(fā)送給下一個消費(fèi)者。平均每個消費(fèi)者將獲得相同數(shù)量的消息。這種分發(fā)消息的方式叫做循環(huán)(round-robin)。
3.3. 消息確認(rèn)
按照我們上面的demo,一旦RabbitMQ將消息發(fā)送到消費(fèi)端,消息就會立即從內(nèi)存中移出,無論消費(fèi)端是否處理完成。在這種情況下,消息就會丟失。
為了確保一個消息永遠(yuǎn)不會丟失,RabbitMQ支持消息確認(rèn)(message acknowledgments)。當(dāng)消費(fèi)端接收消息并且處理完成后,會發(fā)送一個ack(消息確認(rèn))信號到RabbitMQ,RabbitMQ接收到這個信號后,就可以刪除掉這條已經(jīng)處理的消息任務(wù)。但如果消費(fèi)端掛掉了(比如,通道關(guān)閉、連接丟失等)沒有發(fā)送ack信號。RabbitMQ就會明白某個消息沒有正常處理,RabbitMQ將會重新將消息入隊(duì),如果有另外一個消費(fèi)端在線,就會快速的重新發(fā)送到另外一個消費(fèi)端。
RabbitMQ中沒有消息超時的概念,只有當(dāng)消費(fèi)端關(guān)閉或奔潰時,RabbitMQ才會重新分發(fā)消息。
微調(diào)下Receive中的代碼邏輯:
//5. 構(gòu)造消費(fèi)者實(shí)例var consumer = new EventingBasicConsumer(channel);?//6. 綁定消息接收后的事件委托consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine(" [x] Received {0}", message);Thread.Sleep(6000);//模擬耗時Console.WriteLine(" [x] Done"); ? ?
? // 7. 發(fā)送消息確認(rèn)信號(手動消息確認(rèn))channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};
? //8. 啟動消費(fèi)者//autoAck:true;自動進(jìn)行消息確認(rèn),當(dāng)消費(fèi)端接收到消息后,就自動發(fā)送ack信號,不管消息是否正確處理完畢//autoAck:false;關(guān)閉自動消息確認(rèn),通過調(diào)用BasicAck方法手動進(jìn)行消息確認(rèn)channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
主要改動的是將?autoAck:true修改為autoAck:fasle,以及在消息處理完畢后手動調(diào)用BasicAck方法進(jìn)行手動消息確認(rèn)。
從圖中可知,消息發(fā)送端連續(xù)發(fā)送4條消息,其中消費(fèi)端1先被分配處理第一條消息,消費(fèi)端2被循環(huán)分配第二條消息,第三條消息由于沒有空閑消費(fèi)者仍然在隊(duì)列中。
在消費(fèi)端2未處理完第一條消息之前,手動中斷(ctrl+c)。我們可以發(fā)現(xiàn)RabbitMQ在下一次分發(fā)時,會優(yōu)先將被中斷的消息分發(fā)給消費(fèi)端1處理。
3.4. 消息持久化
消息確認(rèn)確保了即使消費(fèi)端異常,消息也不會丟失能夠被重新分發(fā)處理。但是如果RabbitMQ服務(wù)端異常,消息依然會丟失。除非我們指定durable:true,否則當(dāng)RabbitMQ退出或奔潰時,消息將依然會丟失。通過指定durable:true,并指定Persistent=true,來告知RabbitMQ將消息持久化。
//send.cs//4. 申明隊(duì)列(指定durable:true,告知rabbitmq對消息進(jìn)行持久化)channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//將消息標(biāo)記為持久性 - 將IBasicProperties.SetPersistent設(shè)置為truevar properties = channel.CreateBasicProperties(); properties.Persistent = true;
//5. 構(gòu)建byte消息數(shù)據(jù)包string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!"; var body = Encoding.UTF8.GetBytes(message);
//6. 發(fā)送數(shù)據(jù)包(指定basicProperties)channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
將消息標(biāo)記為持久性不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當(dāng)RabbitMQ接受消息并且還沒有保存時,仍然有一個很短的時間窗口。RabbitMQ 可能只是將消息保存到了緩存中,并沒有將其寫入到磁盤上。持久化是不能夠一定保證的,但是對于一個簡單任務(wù)隊(duì)列來說已經(jīng)足夠。如果需要確保消息隊(duì)列的持久化,可以使用publisher confirms.
3.5. 公平分發(fā)
RabbitMQ的消息分發(fā)默認(rèn)按照消費(fèi)端的數(shù)量,按順序循環(huán)分發(fā)。這樣僅是確保了消費(fèi)端被平均分發(fā)消息的數(shù)量,但卻忽略了消費(fèi)端的閑忙情況。這就可能出現(xiàn)某個消費(fèi)端一直處理耗時任務(wù)處于阻塞狀態(tài),某個消費(fèi)端一直處理一般任務(wù)處于空置狀態(tài),而只是它們分配的任務(wù)數(shù)量一樣。
但我們可以通過channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
設(shè)置prefetchCount : 1來告知RabbitMQ,在未收到消費(fèi)端的消息確認(rèn)時,不再分發(fā)消息,也就確保了當(dāng)消費(fèi)端處于忙碌狀態(tài)時,不再分配任務(wù)。
//4. 申明隊(duì)列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//設(shè)置prefetchCount : 1來告知RabbitMQ,在未收到消費(fèi)端的消息確認(rèn)時,不再分發(fā)消息,也就確保了當(dāng)消費(fèi)端處于忙碌狀態(tài)時
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
這時你需要注意的是如果所有的消費(fèi)端都處于忙碌狀態(tài),你的隊(duì)列可能會被塞滿。你需要注意這一點(diǎn),要么添加更多的消費(fèi)端,要么采取其他策略。
4. Exchange
細(xì)心的你也許發(fā)現(xiàn)上面的demo,生產(chǎn)者和消費(fèi)者直接是通過相同隊(duì)列名稱進(jìn)行匹配銜接的。消費(fèi)者訂閱某個隊(duì)列,生產(chǎn)者創(chuàng)建消息發(fā)布到隊(duì)列中,隊(duì)列再將消息轉(zhuǎn)發(fā)到訂閱的消費(fèi)者。這樣就會有一個局限性,即消費(fèi)者一次只能發(fā)送消息到某一個隊(duì)列。
那消費(fèi)者如何才能發(fā)送消息到多個消息隊(duì)列呢?
RabbitMQ提供了Exchange,它類似于路由器的功能,它用于對消息進(jìn)行路由,將消息發(fā)送到多個隊(duì)列上。Exchange一方面從生產(chǎn)者接收消息,另一方面將消息推送到隊(duì)列。但exchange必須知道如何處理接收到的消息,是將其附加到特定隊(duì)列還是附加到多個隊(duì)列,還是直接忽略。而這些規(guī)則由exchange type定義,exchange的原理如下圖所示。
常見的exchange type 有以下幾種:
direct(明確的路由規(guī)則:消費(fèi)端綁定的隊(duì)列名稱必須和消息發(fā)布時指定的路由名稱一致)
topic (模式匹配的路由規(guī)則:支持通配符)
fanout (消息廣播,將消息分發(fā)到exchange上綁定的所有隊(duì)列上)
下面我們就來一一這介紹它們的用法。
4.1 fanout
本著先易后難的思想,我們先來了解下fanout的廣播路由機(jī)制。fanout的路由機(jī)制如下圖,即發(fā)送到 fanout 類型exchange的消息都會分發(fā)到所有綁定該exchange的隊(duì)列上去。
生產(chǎn)者示例代碼:
// 生成隨機(jī)隊(duì)列名稱var queueName = channel.QueueDeclare().QueueName;//使用fanout exchange type,指定exchange名稱channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message);//發(fā)布到指定exchange,fanout類型無需指定routingKeychannel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);消費(fèi)者示例代碼:
//申明fanout類型exchangechannel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");//申明隨機(jī)隊(duì)列名稱var queuename = channel.QueueDeclare ().QueueName;//綁定隊(duì)列到指定fanout類型exchange,無需指定路由鍵channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");4.2. direct
direct相對于fanout就屬于完全匹配、單播的模式,路由機(jī)制如下圖,即隊(duì)列名稱和消息發(fā)送時指定的路由完全匹配時,消息才會發(fā)送到指定隊(duì)列上。
生產(chǎn)者示例代碼:
// 生成隨機(jī)隊(duì)列名稱var queueName = channel.QueueDeclare().QueueName;//使用direct exchange type,指定exchange名稱channel.ExchangeDeclare(exchange: "directEC", type: "direct"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message);
//發(fā)布到direct類型exchange,必須指定routingKeychannel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);
消費(fèi)者示例代碼:
//申明direct類型exchangechannel.ExchangeDeclare (exchange: "directEC", type: "direct");
//綁定隊(duì)列到direct類型exchange,需指定路由鍵routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");
4.3. topic
topic是direct的升級版,是一種模式匹配的路由機(jī)制。它支持使用兩種通配符來進(jìn)行模式匹配:符號#和符號*。其中*匹配一個單詞,?#則表示匹配0個或多個單詞,單詞之間用.分割。如下圖所示。
生產(chǎn)者示例代碼:
// 生成隨機(jī)隊(duì)列名稱var queueName = channel.QueueDeclare().QueueName;//使用topic exchange type,指定exchange名稱channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message);//發(fā)布到topic類型exchange,必須指定routingKeychannel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);消費(fèi)者示例代碼:
//申明topic類型exchangechannel.ExchangeDeclare (exchange: "topicEC", type: "topic");//申明隨機(jī)隊(duì)列名稱var queuename = channel.QueueDeclare ().QueueName;//綁定隊(duì)列到topic類型exchange,需指定路由鍵routingKeychannel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");5. RPC
RPC——Remote Procedure Call,遠(yuǎn)程過程調(diào)用。
那RabbitMQ如何進(jìn)行遠(yuǎn)程調(diào)用呢?示意圖如下:
第一步,主要是進(jìn)行遠(yuǎn)程調(diào)用的客戶端需要指定接收遠(yuǎn)程回調(diào)的隊(duì)列,并申明消費(fèi)者監(jiān)聽此隊(duì)列。
第二步,遠(yuǎn)程調(diào)用的服務(wù)端除了要申明消費(fèi)端接收遠(yuǎn)程調(diào)用請求外,還要將結(jié)果發(fā)送到客戶端用來監(jiān)聽的結(jié)果的隊(duì)列中去。
遠(yuǎn)程調(diào)用客戶端:
//申明唯一guid用來標(biāo)識此次發(fā)送的遠(yuǎn)程調(diào)用請求var correlationId = Guid.NewGuid().ToString();//申明需要監(jiān)聽的回調(diào)隊(duì)列var replyQueue = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties();properties.ReplyTo = replyQueue;//指定回調(diào)隊(duì)列properties.CorrelationId = correlationId;//指定消息唯一標(biāo)識string number = args.Length > 0 ? args[0] : "30"; var body = Encoding.UTF8.GetBytes(number); //發(fā)布消息channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);Console.WriteLine($"[*] Request fib({number})"); //
//創(chuàng)建消費(fèi)者用于處理消息回調(diào)(遠(yuǎn)程調(diào)用返回結(jié)果)var callbackConsumer = new EventingBasicConsumer(channel);channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);callbackConsumer.Received += (model, ea) =>{ ? ?
?//僅當(dāng)消息回調(diào)的ID與發(fā)送的ID一致時,說明遠(yuǎn)程調(diào)用結(jié)果正確返回。if (ea.BasicProperties.CorrelationId == correlationId){ ? ? ? ? var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";Console.WriteLine($"[x]: {responseMsg}");}};
遠(yuǎn)程調(diào)用服務(wù)端:
//申明隊(duì)列接收遠(yuǎn)程調(diào)用請求channel.QueueDeclare(queue: "rpc_queue", durable: false, ? ?exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine("[*] Waiting for message.");//請求處理邏輯
consumer.Received += (model, ea) => {var message = Encoding.UTF8.GetString(ea.Body); ?
?int n = int.Parse(message);Console.WriteLine($"Receive request of Fib({n})"); ?
??int result = Fib(n); ? ?//從請求的參數(shù)中獲取請求的唯一標(biāo)識,在消息回傳時同樣綁定var properties = ea.BasicProperties;var replyProerties = channel.CreateBasicProperties();replyProerties.CorrelationId = properties.CorrelationId; ? ?//將遠(yuǎn)程調(diào)用結(jié)果發(fā)送到客戶端監(jiān)聽的隊(duì)列上channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo, ? ? ? ?basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString())); ? ?//手動發(fā)回消息確認(rèn)channel.BasicAck(ea.DeliveryTag, false);Console.WriteLine($"Return result: Fib({n})= {result}"); }; channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
6. 總結(jié)
基于上面的demo和對幾種不同exchange路由機(jī)制的學(xué)習(xí),我們發(fā)現(xiàn)RabbitMQ主要是涉及到以下幾個核心概念:
Publisher:生產(chǎn)者,消息的發(fā)送方。
Connection:網(wǎng)絡(luò)連接。
Channel:信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。
Exchange:交換器(路由器),負(fù)責(zé)消息的路由到相應(yīng)隊(duì)列。
Binding:隊(duì)列與交換器間的關(guān)聯(lián)綁定。消費(fèi)者將關(guān)注的隊(duì)列綁定到指定交換器上,以便Exchange能準(zhǔn)確分發(fā)消息到指定隊(duì)列。
Queue:隊(duì)列,消息的緩沖存儲區(qū)。
Virtual Host:虛擬主機(jī),虛擬主機(jī)提供資源的邏輯分組和分離。包含連接,交換,隊(duì)列,綁定,用戶權(quán)限,策略等。
Broker:消息隊(duì)列的服務(wù)器實(shí)體。
Consumer:消費(fèi)者,消息的接收方。
這次作為入門就講到這里,下次我們來講解下EventBus + RabbitMQ如何實(shí)現(xiàn)事件的分發(fā)。
相關(guān)文章:
RabbitMQ系列教程之一:我們從最簡單的事情開始!Hello World
RabbitMQ系列教程之二:工作隊(duì)列(Work Queues)
RabbitMQ系列教程之三:發(fā)布/訂閱(Publish/Subscribe)
RabbitMQ系列教程之四:路由(Routing)
如何優(yōu)雅的使用RabbitMQ
.NET 使用 RabbitMQ 圖文簡介
RabbitMQ 高可用集群搭建及電商平臺使用經(jīng)驗(yàn)總結(jié)
搭建高可用的rabbitmq集群 + Mirror Queue + 使用C#驅(qū)動連接
RabbitMQ消息隊(duì)列應(yīng)用
體驗(yàn)Rabbitmq強(qiáng)大的【優(yōu)先級隊(duì)列】之輕松面對現(xiàn)實(shí)業(yè)務(wù)場景
原文地址:http://www.cnblogs.com/sheng-jie/p/7192690.html
.NET社區(qū)新聞,深度好文,微信中搜索dotNET跨平臺或掃描二維碼關(guān)注
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ知多少的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET Exceptionless 日
- 下一篇: AOP in dotnet :Aspec