日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rabbitmq 持久化_RabbitMQ原理与相关操作(三)消息持久化

發布時間:2024/7/23 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq 持久化_RabbitMQ原理与相关操作(三)消息持久化 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

現在聊一下RabbitMQ消息持久化:

問題及方案描述

1.當有多個消費者同時收取消息,且每個消費者在接收消息的同時,還要處理其它的事情,且會消耗很長的時間。在此過程中可能會出現一些意外,比如消息接收到一半的時候,一個消費者死掉了。

這種情況要使用消息接收確認機制,可以執行上次宕機的消費者沒有完成的事情。

2.在默認情況下,我們程序創建的消息隊列以及存放在隊列里面的消息,都是非持久化的。當RabbitMQ死掉了或者重啟了,上次創建的隊列、消息都不會保存。

這種情況可以使用RabbitMQ提供的消息隊列的持久化機制。

相關理論描述

RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我個人覺得大多數開發人員都會選擇持久化。

隊列和交換機有一個創建時候指定的標志durabledurable的唯一含義就是具有這個標志的隊列和交換機會在重啟之后重新建立,它不表示說在隊列當中的消息會在重啟后恢復。

消息隊列持久化包括3個部分:

1、exchange持久化,在聲明時指定durable => true2、queue持久化,在聲明時指定durable => true3、消息持久化,在投遞時指定delivery_mode=> 2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

注意:一旦創建了隊列和交換機,就不能修改其標志了。例如,如果創建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重現創建

程序示例

生產者

class Producter { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); string message = "Eric is very handsome"; var body = Encoding.UTF8.GetBytes(message); //將隊列設置為持久化之后,還需要將消息也設為可持久化的 var props = channel.CreateBasicProperties(); props.SetPersistent(true); channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body); Console.WriteLine("Producter Sent: {0}", message); Console.ReadKey(); } } }

注:ack是 acknowledgments 的縮寫,noAck 是("no manual acks")

因為我前段時間換了筆記本,所以用戶的“eric”的操作出踩了個坑,下面進行介紹下:

如果調試運行時報錯:None of the specified endpoints were reachable

innerException是:

{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text="Unexpected Exception", classId=0, methodId=0, cause=System.IO.IOException: 無法從傳輸連接中讀取數據: 遠程主機強迫關閉了一個現有的連接。。 ---> System.Net.Sockets.SocketException: 遠程主機強迫關閉了一個現有的連接。 在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags) 在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size) --- 內部異常堆棧跟蹤的結尾 --- 在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader) 在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame() 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration() 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}

這說明我們使用的用戶 不是 系統默認的 guest 而是我們自己創建的用戶,但是沒有足夠的權限進行操作。

解決辦法:

rabbitmqctl set_user_tags username administratorrabbitmqctl set_permissions -p / username ".*" ".*" ".*"

執行結果:

相關其他操作見:windows下 安裝 rabbitMQ 及操作常用命令

程序運行結果:

消費者

class Recevice { const string ExchangeName = "eric.exchange"; const string QueueName = "eric.queue"; public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的 channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true); //NoAck:true 告訴RabbitMQ立即從隊列中刪除消息,另一個非常受歡迎的方式是從隊列中刪除已經確認接收的消息,可以通過單獨調用BasicAck 進行確認: //BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false); var msgContent = Encoding.UTF8.GetString(msgResponse.Body); Console.WriteLine("The received content:"+msgContent); channel.BasicAck(msgResponse.DeliveryTag, multiple: false); //使用BasicAck方式來告之是否從隊列中移除該條消息 //需要額外注意,比如從隊列中獲取消息并用它來操作數據庫或日志文件時,如果出現操作失敗時,則該條消息應該保留在隊列中,只到操作成功時才從隊列中移除。 Console.ReadKey(); } } }

接受消息還有一種方法,就是通過基于推送的事件訂閱。可以使用內置的 QueueingBasicConsumer 提供簡化的編程模型,允許在共享隊列上阻塞,直到收到一條消息。

var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(QueueName, noAck: true, consumer: consumer); var msgResponse = consumer.Queue.Dequeue(); var msgContent = Encoding.UTF8.GetString(msgResponse.Body);

程序運行結果:

原文鏈接:https://www.cnblogs.com/ericli-ericli/p/5938106.html

總結

以上是生活随笔為你收集整理的rabbitmq 持久化_RabbitMQ原理与相关操作(三)消息持久化的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。