日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ简介和六种工作模式详解

發布時間:2023/12/4 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ简介和六种工作模式详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、RabbitMQ簡介

是一個開源的消息代理和隊列服務器,用來通過普通協議在完全不同的應用之間共享數據,RabbitMQ是使用Erlang(高并發語言)語言來編寫的,并且RabbitMQ是基于AMQP協議的。

1.1 AMQP協議

Advanced Message Queuing Protocol(高級消息隊列協議)

1.2 AMQP專業術語:(多路復用->在同一個線程中開啟多個通道進行操作)

  • Server:又稱broker,接受客戶端的鏈接,實現AMQP實體服務

  • Connection:連接,應用程序與broker的網絡連接

  • Channel:網絡信道,幾乎所有的操作都在channel中進行,Channel是進行消息讀寫的通道??蛻舳丝梢越⒍鄠€channel,每個channel代表一個會話任務。

  • Message:消息,服務器與應用程序之間傳送的數據,由Properties和Body組成.Properties可以對消息進行修飾,必須消息的優先級、延遲等高級特性;Body則是消息體內容。

  • virtualhost: 虛擬地址,用于進行邏輯隔離,最上層的消息路由。一個virtual host里面可以有若干個Exchange和Queue,同一個Virtual Host 里面不能有相同名稱的Exchange 或 Queue。

  • Exchange:交換機,接收消息,根據路由鍵轉單消息到綁定隊列

  • Binding: Exchange和Queue之間的虛擬鏈接,binding中可以包換routing key

  • Routing key: 一個路由規則,虛擬機可用它來確定如何路由一個特定消息。(如負載均衡)

1.3 RabbitMQ整體架構

?

?

ClientA(生產者)發送消息到Exchange1(交換機),同時帶上RouteKey(路由Key),Exchange1找到綁定交換機為它和綁定傳入的RouteKey的隊列,把消息轉發到對應的隊列,消費者Client1,Client2,Client3只需要指定對應的隊列名即可以消費隊列數據。

交換機和隊列多對多關系,實際開發中一般是一個交換機對多個隊列,防止設計復雜化。

?

二、安裝RabbitMQ

安裝方式不影響下面的使用,這里用Docker安裝

#15672端口為web管理端的端口,5672為RabbitMQ服務的端口 docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management

輸入:ip:5672訪問驗證。

建一個名為develop的Virtual host(虛擬主機)使用,項目中一般是一個項目建一個Virtual host用,能夠隔離隊列。

切換Virtual host

三、RabbitMQ六種隊列模式在.NetCore中使用

(1)簡單隊列

最簡單的工作隊列,其中一個消息生產者,一個消息消費者,一個隊列。也稱為點對點模式

?

?

?

描述:一個生產者 P 發送消息到隊列 Q,一個消費者 C 接收

建一個RabbitMQHelper.cs類

/// <summary>/// RabbitMQ幫助類/// </summary>public class RabbitMQHelper{private static ConnectionFactory factory;private static object lockObj = new object();/// <summary>/// 獲取單個RabbitMQ連接/// </summary>/// <returns></returns>public static IConnection GetConnection(){if (factory == null){lock (lockObj){if (factory == null){factory = new ConnectionFactory{HostName = "172.16.2.84",//ipPort = 5672,//端口UserName = "admin",//賬號Password = "123456",//密碼VirtualHost = "develop" //虛擬主機};}}}return factory.CreateConnection();}}

生產者代碼:

新建發送類Send.cs

public static void SimpleSendMsg(){string queueName = "simple_order";//隊列名//創建連接using (var connection = RabbitMQHelper.GetConnection()){//創建信道using (var channel = connection.CreateModel()){//創建隊列channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);for (var i = 0; i < 10; i++){string message = $"Hello RabbitMQ MessageHello,{i + 1}";var body = Encoding.UTF8.GetBytes(message);//發送消息channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);Console.WriteLine($"發送消息到隊列:{queueName},內容:{message}");}}}}

創建隊列參數解析:

durable:是否持久化。

exclusive:排他隊列,只有創建它的連接(connection)能連,創建它的連接關閉,會自動刪除隊列。

autoDelete:被消費后,消費者數量都斷開時自動刪除隊列。

arguments:創建隊列的參數。

發送消息參數解析:

exchange:交換機,為什么能傳空呢,因為RabbitMQ內置有一個默認的交換機,如果傳空時,就會用默認交換機。

routingKey:路由名稱,這里用隊列名稱做路由key。

mandatory:true告訴服務器至少將消息route到一個隊列種,否則就將消息return給發送者;false:沒有找到路由則消息丟棄。

執行效果:

?

?

隊列產生10條消息。

?

?消費者代碼:

新建Recevie.cs類

public static void SimpleConsumer(){string queueName = "simple_order";var connection = RabbitMQHelper.GetConnection();{//創建信道var channel = connection.CreateModel();{//創建隊列channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);var consumer = new EventingBasicConsumer(channel);int i = 0;consumer.Received += (model, ea) =>{//消費者業務處理var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{i},隊列{queueName}消費消息長度:{message.Length}");i++;};channel.BasicConsume(queueName, true, consumer);}}}

消費者只需要知道隊列名就可以消費了,不需要Exchange和routingKey。

注:消費者這里有一個創建隊列,它本身不需要,是預防消費端程序先執行,沒有隊列會報錯。

執行效果:

?

?

?

?

?

消息已經被消費完。

(2)工作隊列模式

一個消息生產者,一個交換器,一個消息隊列,多個消費者。同樣也稱為點對點模式

?

生產者P發送消息到隊列,多個消費者C消費隊列的數據。

工作隊列也稱為公平性隊列模式,循環分發,RabbitMQ?將按順序將每條消息發送給下一個消費者,每個消費者將獲得相同數量的消息。

生產者:

Send.cs代碼:

    /// <summary>/// 工作隊列模式/// </summary>public static void WorkerSendMsg(){string queueName = "worker_order";//隊列名//創建連接using (var connection = RabbitMQHelper.GetConnection()){//創建信道using (var channel = connection.CreateModel()){//創建隊列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);var properties = channel.CreateBasicProperties();properties.Persistent = true; //消息持久化for ( var i=0;i<10;i++){string message = $"Hello RabbitMQ MessageHello,{i+1}";var body = Encoding.UTF8.GetBytes(message);//發送消息到rabbitmqchannel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body);Console.WriteLine($"發送消息到隊列:{queueName},內容:{message}");}}}}

參數durable:true,需要持久化,實際項目中肯定需要持久化的,不然重啟RabbitMQ數據就會丟失了。

執行效果:

?

?

?

?寫入10條數據,有持久化標識D。

?

?

?消費端:

Recevie代碼:

public static void WorkerConsumer(){string queueName = "worker_order";var connection = RabbitMQHelper.GetConnection();{//創建信道var channel = connection.CreateModel();{//創建隊列channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);var consumer = new EventingBasicConsumer(channel);//prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多于 N 個消息,也確保了消費速度和性能channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);int i = 1;int index = new Random().Next(10);consumer.Received += (model, ea) =>{//處理業務var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{i},消費者:{index},隊列{queueName}消費消息長度:{message.Length}");channel.BasicAck(ea.DeliveryTag, false); //消息ack確認,告訴mq這條隊列處理完,可以從mq刪除了Thread.Sleep(1000);i++;};channel.BasicConsume(queueName,autoAck:false, consumer);}}}

BasicQos參數解析:

prefetchSize:每條消息大小,一般設為0,表示不限制。

prefetchCount:1,作用限流,告訴RabbitMQ不要同時給一個消費者推送多于N個消息,消費者會把N條消息緩存到本地一條條消費,如果不設,RabbitMQ會進可能快的把消息推到客戶端,導致客戶端內存升高。設置合理可以不用頻繁從RabbitMQ 獲取能提升消費速度和性能,設的太多的話則會增大本地內存,需要根據機器性能合理設置,官方建議設為30。

global:是否為全局設置。

這些限流設置針對消費者autoAck:false時才有效,如果是自動Ack的,限流不生效。

?

執行兩個消費者,效果:

?

可以看到消費者號的標識,8,2,8,2是平均的,一個消費者5個,RabbitMQ上也能看到有2個消費者,Unacked數是2,因為每個客戶端的限流數是1。

?

工作隊列模式也是很常用的隊列模式。

(3)發布訂閱模式?

Pulish/Subscribe,無選擇接收消息,一個消息生產者,一個交換機(交換機類型為fanout),多個消息隊列,多個消費者。稱為發布/訂閱模式

在應用中,只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。

?

?

?

生產者P只需把消息發送到交換機X,綁定這個交換機的隊列都會獲得一份一樣的數據。

?

應用場景:適合于用同一份數據源做不同的業務。

生產者代碼:

     /// <summary>/// 發布訂閱, 扇形隊列/// </summary>public static void SendMessageFanout(){//創建連接using (var connection = RabbitMQHelper.GetConnection()){//創建信道using (var channel = connection.CreateModel()){string exchangeName = "fanout_exchange";//創建交換機,fanout類型channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);string queueName1 = "fanout_queue1";string queueName2 = "fanout_queue2";string queueName3 = "fanout_queue3";//創建隊列channel.QueueDeclare(queueName1, false, false, false);channel.QueueDeclare(queueName2, false, false, false);channel.QueueDeclare(queueName3, false, false, false);//把創建的隊列綁定交換機,routingKey不用給值,給了也沒意義的channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");var properties = channel.CreateBasicProperties();properties.Persistent = true; //消息持久化//向交換機寫10條消息for (int i = 0; i < 10; i++){string message = $"RabbitMQ Fanout {i + 1} Message";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchangeName, routingKey: "", null, body);Console.WriteLine($"發送Fanout消息:{message}");}}}}

?

執行代碼:

?

?

?

?向交換機發送10條消息,則綁定這個交換機的3個隊列都會有10條消息。

消費端的代碼和工作隊列的一樣,只需知道隊列名即可消費,聲明時要和生產者的聲明一樣。

(4)路由模式(推薦使用)

在發布/訂閱模式的基礎上,有選擇的接收消息,也就是通過 routing 路由進行匹配條件是否滿足接收消息。

?

?

?

?上圖是一個結合日志消費級別的配圖,在路由模式它會把消息路由到那些 binding key 與 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct模式。

?生產者P發送數據是要指定交換機(X)和routing發送消息 ,指定的routingKey=error,則隊列Q1和隊列Q2都會有一份數據,如果指定routingKey=into,或=warning,交換機(X)只會把消息發到Q2隊列。

?生產者代碼:

 /// <summary>/// 路由模式,點到點直連隊列/// </summary>public static void SendMessageDirect(){//創建連接using (var connection = RabbitMQHelper.GetConnection()){//創建信道using (var channel = connection.CreateModel()){//聲明交換機對象,fanout類型string exchangeName = "direct_exchange";channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);//創建隊列string queueName1 = "direct_errorlog";string queueName2 = "direct_alllog";channel.QueueDeclare(queueName1, true, false, false);channel.QueueDeclare(queueName2, true, false, false);//把創建的隊列綁定交換機,direct_errorlog隊列只綁定routingKey:errorchannel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");//direct_alllog隊列綁定routingKey:error,infochannel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");var properties = channel.CreateBasicProperties();properties.Persistent = true; //消息持久化//向交換機寫10條錯誤日志和10條Info日志for (int i = 0; i < 10; i++){string message = $"RabbitMQ Direct {i + 1} error Message";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchangeName, routingKey: "error", properties, body);Console.WriteLine($"發送Direct消息error:{message}");string message2 = $"RabbitMQ Direct {i + 1} info Message";var body2 = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchangeName, routingKey: "info", properties, body2);Console.WriteLine($"info:{message2}");}}}}

這里創建一個direct類型的交換機,兩個路由key,一個error,一個info,兩個隊列,一個隊列只綁定error,一個隊列綁定error和info,向error和info各發10條消息。

執行代碼:

?

?

?查看RabbitMQ管理界面,direct_errorlog隊列10條,而direct_alllog有20條,因為direct_alllog隊列兩個routingKey的消息都進去了。

?

?

?

?

?

?點進去看下兩個隊列綁定的交換機和routingKey

?

?

?

?

?

?

?

?

?消費者代碼:

消費者和工作隊列一樣,只需根據隊列名消費即可,這里只消費direct_errorlog隊列作示例

public static void DirectConsumer(){string queueName = "direct_errorlog";var connection = RabbitMQHelper.GetConnection();{//創建信道var channel = connection.CreateModel();{//創建隊列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);var consumer = new EventingBasicConsumer(channel);///prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多于 N 個消息,也確保了消費速度和性能///global:是否設為全局的///prefetchSize:單條消息大小,通常設0,表示不做限制//是autoAck=false才會有效channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);int i = 1;consumer.Received += (model, ea) =>{//處理業務var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{i},隊列{queueName}消費消息長度:{message.Length}");channel.BasicAck(ea.DeliveryTag, false); //消息ack確認,告訴mq這條隊列處理完,可以從mq刪除了i++;};channel.BasicConsume(queueName, autoAck: false, consumer);}}}

普通場景中推薦使用路由模式,因為路由模式有交換機,有路由key,能夠更好的拓展各種應用場景。

(5)主題模式

topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似于SQL中 = 和 like 的關系。

?

P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。

?

?topics 模式與 routing 模式比較相近,topics 模式不能具有任意的 routingKey,必須由一個英文句點號“.”分隔的字符串(我們將被句點號“.”分隔開的每一段獨立的字符串稱為一個單詞),比如 "lazy.orange.a"。topics routingKey 中可以存在兩種特殊字符"*"與“#”,用于做模糊匹配,其中“*”用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個)。

以上圖為例:

如果發送消息的routingKey設置為:

aaa.orange.rabbit,那么消息會路由到Q1與Q2,

routingKey=aaa.orange.bb的消息會路由到Q1,

routingKey=lazy.aa.bb.cc的消息會路由到Q2;

routingKey=lazy.aa.rabbit的消息會路由到 Q2(只會投遞給Q2一次,雖然這個routingKey 與 Q2 的兩個 bindingKey 都匹配);

沒匹配routingKey的消息將會被丟棄。

生產者代碼:

public static void SendMessageTopic(){//創建連接using (var connection = RabbitMQHelper.GetConnection()){//創建信道using (var channel = connection.CreateModel()){//聲明交換機對象,fanout類型string exchangeName = "topic_exchange";channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);//隊列名string queueName1 = "topic_queue1";string queueName2 = "topic_queue2";//路由名string routingKey1 = "*.orange.*";string routingKey2 = "*.*.rabbit";string routingKey3 = "lazy.#";channel.QueueDeclare(queueName1, true, false, false);channel.QueueDeclare(queueName2, true, false, false);//把創建的隊列綁定交換機,routingKey指定routingKeychannel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);//向交換機寫10條消息for (int i = 0; i < 10; i++){string message = $"RabbitMQ Direct {i + 1} Message";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);Console.WriteLine($"發送Topic消息:{message}");}}}}

這里演示了 routingKey為aaa.orange.rabbit,和lazy.aa.rabbit的情況,第一個匹配到Q1和Q2,第二個匹配到Q2,所以應該Q1是10條,Q2有20條,

執行后看rabbitMQ界面:

?

(6)RPC模式

與上面其他5種所不同之處,該模式是擁有請求/回復的。也就是有響應的,上面5種都沒有。

RPC是指遠程過程調用,也就是說兩臺服務器A,B,一個應用部署在A服務器上,想要調用B服務器上應用提供的處理業務,處理完后然后在A服務器繼續執行下去,把異步的消息以同步的方式執行。

?

?客戶端(C)聲明一個排他隊列自己訂閱,然后發送消息到RPC隊列同時也把這個排他隊列名也在消息里傳進去,服務端監聽RPC隊列,處理完業務后把處理結果發送到這個排他隊列,然后客戶端收到結果,繼續處理自己的邏輯。

RPC的處理流程:

  • 當客戶端啟動時,創建一個匿名的回調隊列。

  • 客戶端為RPC請求設置2個屬性:replyTo:設置回調隊列名字;correlationId:標記request。

  • 請求被發送到rpc_queue隊列中。

  • RPC服務器端監聽rpc_queue隊列中的請求,當請求到來時,服務器端會處理并且把帶有結果的消息發送給客戶端。接收的隊列就是replyTo設定的回調隊列。

  • 客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。

服務端代碼:

public class RPCServer{public static void RpcHandle(){var connection = RabbitMQHelper.GetConnection();{var channel = connection.CreateModel();{string queueName = "rpc_queue";channel.QueueDeclare(queue: queueName, durable: false,exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 1, false);var consumer = new EventingBasicConsumer(channel);channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);Console.WriteLine("【服務端】等待RPC請求...");consumer.Received += (model, ea) =>{string response = null;var body = ea.Body.ToArray();var props = ea.BasicProperties;var replyProps = channel.CreateBasicProperties();replyProps.CorrelationId = props.CorrelationId;try{var message = Encoding.UTF8.GetString(body);Console.WriteLine($"【服務端】接收到數據:{ message},開始處理");response = $"消息:{message},處理完成";}catch (Exception e){Console.WriteLine("錯誤:" + e.Message);response = "";}finally{var responseBytes = Encoding.UTF8.GetBytes(response);channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,basicProperties: replyProps, body: responseBytes);channel.BasicAck(deliveryTag: ea.DeliveryTag,multiple: false);}};}}}}

客戶端:

public class RPCClient{private readonly IConnection connection;private readonly IModel channel;private readonly string replyQueueName;private readonly EventingBasicConsumer consumer;private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();private readonly IBasicProperties props;public RPCClient(){connection = RabbitMQHelper.GetConnection();channel = connection.CreateModel();replyQueueName = channel.QueueDeclare().QueueName;consumer = new EventingBasicConsumer(channel);props = channel.CreateBasicProperties();var correlationId = Guid.NewGuid().ToString();props.CorrelationId = correlationId; //給消息idprops.ReplyTo = replyQueueName;//回調的隊列名,Client關閉后會自動刪除consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var response = Encoding.UTF8.GetString(body);//監聽的消息Id和定義的消息Id相同代表這條消息服務端處理完成if (ea.BasicProperties.CorrelationId == correlationId){respQueue.Add(response);}};channel.BasicConsume(consumer: consumer,queue: replyQueueName,autoAck: true);}public string Call(string message){var messageBytes = Encoding.UTF8.GetBytes(message);//發送消息channel.BasicPublish(exchange: "",routingKey: "rpc_queue",basicProperties: props,body: messageBytes);//等待回復return respQueue.Take();}public void Close(){connection.Close();}}

執行代碼:

static void Main(string[] args){Console.WriteLine("Hello World!");//啟動服務端,正常邏輯是在另一個程序RPCServer.RpcHandle();//實例化客戶端var rpcClient = new RPCClient();string message = $"消息id:{new Random().Next(1, 1000)}";Console.WriteLine($"【客服端】RPC請求中,{message}");//向服務端發送消息,等待回復var response = rpcClient.Call(message);Console.WriteLine("【客服端】收到回復響應:{0}", response);rpcClient.Close();Console.ReadKey();}

測試效果:

?

?z執行完,客服端close后,可以接著自己的下一步業務處理。

?

?

總結:

以上便是RabbitMQ的6中模式在.net core中實際使用,其中(1)簡單隊列,(2)工作隊列,(4)路由模式,(6)RPC模式的交換機類型都是direct,(3)發布訂閱的交換機是fanout,(5)topics的交換機是topic。正常場景用的是direct,默認交換機也是direct類型的,推薦用(4)路由模式,因為指定交換機名比起默認的交換機會容易擴展場景,其他的交換機看業務場景所需使用。

下面位置可以看到交換機類型,amq.開頭那幾個是內置的,避免交換機過多可以直接使用。

?

總結

以上是生活随笔為你收集整理的RabbitMQ简介和六种工作模式详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。