日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

RabbitMQ 官方NET教程(二)【工作队列】

發布時間:2025/7/14 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 官方NET教程(二)【工作队列】 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這篇中我們將會創建一個工作隊列用來在工作者(consumer)間分發耗時任務。

工作隊列的主要任務是:避免立刻執行資源密集型任務和避免必須等待其完成。相反地,我們進行任務調度:我們把任務封裝為消息發送給隊列。工作進行在后臺運行并不斷的從隊列中取出任務然后執行。當你運行了多個工作進程時,任務隊列中的任務將會被這些工作進程共享執行。
這樣的概念在web應用中極其有用,當在很短的HTTP請求間需要執行復雜的任務。

準備

在本教程的前面部分,我們發送了一個包含Hello World!的消息。 現在我們將發送代替復雜任務的字符串。 我們沒有一個現實世界的任務,比如圖像被調整大小,或者是要渲染的pdf文件,所以假設我們很忙 - 通過使用Thread.sleep()函數來假冒它。 我們將把字符串中的點數作為其復雜度; 每個點都將占“work”的一秒鐘。 例如,由Hello...描述的假任務將需要三秒鐘。
我們將稍后從之前的例子中修改Send程序,以允許從命令行發送任意消息。 這個程序會將任務安排到我們的工作隊列中,所以讓我們命名為NewTask:

dotnet new console --name NewTask mv NewTask/Program.cs NewTask/NewTask.cs dotnet new console --name Worker mv Worker/Program.cs Worker/Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../Worker dotnet add package RabbitMQ.Client dotnet restore var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties(); properties.Persistent = true;channel.BasicPublish(exchange: "",routingKey: "task_queue",basicProperties: properties,body: body);

有些幫助從命令行參數獲取消息:

private static string GetMessage(string[] args) {return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }

我們的舊的Receive.cs腳本還需要一些更改:它需要為消息體中的每個點偽造一秒的工作時間。 它將處理RabbitMQ發送的消息并執行任務,因此我們將其復制到Worker項目并修改:

var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);

我們假任務到模擬執行時間:

int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);

循環調度

使用任務隊列的優點之一是能夠輕松地并行工作。 如果我們正在建立積壓的工作,我們可以增加更多的工作者,這樣可以輕松擴展。

首先,我們同時嘗試運行兩個Worker實例。 他們都會從隊列中獲取消息,但是究竟如何? 讓我們來看看。

你需要三個控制臺打開。 兩個將運行Worker程序。 這些控制臺將是我們兩個消費者 - C1和C2。

# shell 1 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C

在第三個我們將發布新的任務。 一旦您已經開始使用消費者,您可以發布一些消息:

# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."

讓我們看看送給我們workers的內容:

# shell 1 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' # shell 2 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'

默認情況下,RabbitMQ將按順序將每條消息發送給下一個消費者。 平均每個消費者將獲得相同數量的消息。 這種分發消息的方式叫做循環(round-robin)。 與三名或更多的workers一起嘗試。

消息應答(message acknowledgments)

執行一個任務需要花費幾秒鐘。你可能會擔心當一個消費者在執行任務時發生中斷。使用我們當前的代碼,一旦RabbitMQ向客戶發送消息,它立即將其從內存中刪除。在這種情況下,如果殺死正在執行任務的某個工作者,我們會丟失它正在處理的信息。我們也會丟失已經轉發給這個工作者且它還未執行的消息。

但是我們不想失去任何任務。如果一個worker掛了,我們希望把這個任務交給另一個工作者。

為了確保消息永遠不會丟失,RabbitMQ支持消息確認。從消費者發送一個確認信息告訴RabbitMQ已經收到,處理了特定的消息,然后RabbitMQ可以自由刪除它。

如果消費者死機(其通道關閉,連接關閉或TCP連接丟失),而不發送確認信息,RabbitMQ將會明白消息未被完全處理并重新排隊。如果同時有其他消費者在線,則會迅速將其重新提供給另一個消費者。這樣就可以確保沒有消息丟失,即使工作者偶然死亡。

沒有任何消息超時; RabbitMQ將在消費者掛了時重新發送消息。如果消費者處理一個信息需要耗費特別特別長的時間是允許的。

消息確認默認情況下打開。 在前面的例子中,我們通過將noAck(“no manual acks”)參數設置為true來明確地將其關閉。 一旦完成任務,現在該刪除這個標志并發送正確的確認。

var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);

使用這個代碼,我們可以確定即使在處理消息時,使用CTRL + C殺死一個工作者,也不會丟失任何東西。工作者掛了之后不久,所有未確認的消息將被重新發送。

忘記確認
丟失BasicAck是一個常見的錯誤。 這是一個容易的錯誤,但后果是嚴重的。
當您的客戶端退出(可能看起來像隨機重新傳遞)時,消息將被重新傳遞,但是RabbitMQ將會消耗越來越多的內存,因為它將無法釋放任何未包含的消息。

為了調試這種錯誤,您可以使用rabbitmqctl打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化(Message durability)

我們已經學會了如何確保即使消費者死亡,任務也不會丟失。 但是如果RabbitMQ服務器停止,我們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,它會忘記隊列和消息,除非你告訴它不要丟失。需要兩件事來確保消息不會丟失:我們需要將所有隊列和消息標記為持久化。

首先,我們需要確保RabbitMQ不會丟失我們的隊列。 為了這樣做,我們需要將其聲明為持久的:

channel.QueueDeclare(queue: "hello",durable: true,exclusive: false,autoDelete: false,arguments: null);

雖然這個命令本身是正確的,但是在我們目前的設置中是不行的。 這是因為我們已經定義了一個非持久化的名為hello的隊列。 RabbitMQ不允許您重新定義具有不同參數的現有隊列,并會向嘗試執行此操作的任何程序返回錯誤。 但是有一個快速的解決方法 - 讓我們用不同的名稱聲明一個隊列,例如task_queue:

channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);

這個queueDeclare更改需要應用于生產者和消費者代碼。

在這一點上,我們確信,即使RabbitMQ重新啟動,task_queue隊列也不會丟失。 現在我們需要將我們的消息標記為持久性 - 將IBasicProperties.SetPersistent設置為true。

var properties = channel.CreateBasicProperties(); properties.Persistent = true;

注意消息持久性

將消息標記為持久性不能完全保證消息不會丟失。 雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接受消息并且還沒有保存時,仍然有一個很短的時間窗口。 此外,RabbitMQ不會對每個消息執行`fsync`(同步內存中所有已修改的文件數據到儲存設備) - 它可能只是保存到緩存中,而不是真正寫入磁盤。 持久性保證不強,但對我們的簡單任務隊列來說已經足夠了。 如果您需要更強大的保證,那么您可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。

公平轉發(Fair dispatch)

或許會發現,目前的消息轉發機制(Round-robin)并非是我們想要的。例如,這樣一種情況,對于兩個消費者,有一系列的任務,奇數任務特別耗時,而偶數任務卻很輕松,這樣造成一個消費者一直繁忙,另一個消費者卻很快執行完任務后等待。
造成這樣的原因是因為RabbitMQ僅僅是當消息到達隊列進行轉發消息。并不在乎有多少任務消費者并未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。

channel.BasicQos(0, 1, false);

注意隊列大小

如果所有的工作者都處于繁忙狀態,你的隊列有可能被填充滿。你可能會觀察隊列的使用情況,然后增加工作者,或者使用別的什么策略。

完整的代碼

NewTask.cs 類:

using System; using RabbitMQ.Client; using System.Text;class NewTask {public static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;channel.BasicPublish(exchange: "",routingKey: "task_queue",basicProperties: properties,body: body);Console.WriteLine(" [x] Sent {0}", message);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}private static string GetMessage(string[] args){return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");} }

Worker.cs類:

using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading;class Worker {public static void Main(){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: "task_queue",noAck: false,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}} }

轉載于:https://www.cnblogs.com/Wulex/p/6965057.html

總結

以上是生活随笔為你收集整理的RabbitMQ 官方NET教程(二)【工作队列】的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。