日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ(二):mandatory标志的作用

發布時間:2025/4/5 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ(二):mandatory标志的作用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文轉自:http://m.blog.csdn.net/article/details?id=54311277

在生產者通過channel的basicPublish方法發布消息時,通常有幾個參數需要設置,為此我們有必要了解清楚這些參數代表的具體含義及其作用,查看Channel接口,會發現存在3個重載的basicPublish方法

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)throws IOException;void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)throws IOException;

他們共有的參數分別是:
??????? exchange:交換機名稱
??????? routingKey:路由鍵
??????? props:消息屬性字段,比如消息頭部信息等等
??????? body:消息主體部分
??????? 除此之外,還有mandatory和immediate這兩個參數,鑒于RabbitMQ3.0不再支持immediate標志,因此我們重點討論mandatory標志
????????mandatory的作用:

??????? 當mandatory標志位設置為true時,如果exchange根據自身類型和消息routingKey無法找到一個合適的queue存儲消息,那么broker會調用basic.return方法將消息返還給生產者;當mandatory設置為false時,出現上述情況broker會直接將消息丟棄;通俗的講,mandatory標志告訴broker代理服務器至少將消息route到一個隊列中,否則就將消息return給發送者;

?????? 下面我們通過幾個實例測試下mandatory標志的作用:
????????測試1:設置mandatory標志,且exchange未綁定隊列

public class ProducerTest {public static void main(String[] args) {String exchangeName = "confirmExchange";String queueName = "confirmQueue";String routingKey = "confirmRoutingKey";String bindingKey = "confirmBindingKey";int count = 3;ConnectionFactory factory = new ConnectionFactory();factory.setHost("172.16.151.74");factory.setUsername("test");factory.setPassword("test");factory.setPort(5672);//創建生產者Sender producer = new Sender(factory, count, exchangeName, routingKey);producer.run();} }class Sender {private ConnectionFactory factory;private int count;private String exchangeName;private String routingKey;public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {this.factory = factory;this.count = count;this.exchangeName = exchangeName;this.routingKey = routingKey;}public void run() {try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創建exchangechannel.exchangeDeclare(exchangeName, "direct", true, false, null);//發送持久化消息for(int i = 0;i < count;i++){//第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());}} catch (Exception e) {e.printStackTrace();}} }

?????? 第45行我們將basicPublish的第三個參數mandatory設置成了true,表示開啟了mandatory標志,但我們沒有為當前exchange綁定任何隊列;

?????? 通過wireshark抓包看到下面輸出:??

?????? 可以看到最后執行了basic.return方法,將發布者發出的消息返還給了發布者,查看協議的Arguments參數部分可以看到,Reply-Text字段值為:NO_ROUTE,表示消息并沒有路由到合適的隊列中;

?????? 那么我們該怎么獲取到沒有被正確路由到合適隊列的消息呢?這時候可以通過為channel信道設置ReturnListener監聽器來實現,具體實現代碼見下:

?

public class ProducerTest {public static void main(String[] args) {String exchangeName = "confirmExchange";String queueName = "confirmQueue";String routingKey = "confirmRoutingKey";String bindingKey = "confirmBindingKey";int count = 3;ConnectionFactory factory = new ConnectionFactory();factory.setHost("172.16.151.74");factory.setUsername("test");factory.setPassword("test");factory.setPort(5672);//創建生產者Sender producer = new Sender(factory, count, exchangeName, routingKey);producer.run();} }class Sender {private ConnectionFactory factory;private int count;private String exchangeName;private String routingKey;public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {this.factory = factory;this.count = count;this.exchangeName = exchangeName;this.routingKey = routingKey;}public void run() {try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創建exchangechannel.exchangeDeclare(exchangeName, "direct", true, false, null);//發送持久化消息for(int i = 0;i < count;i++){//第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,//因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話//我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());}channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)throws IOException {//此處便是執行Basic.Return之后回調的地方String message = new String(arg5);System.out.println("Basic.Return返回的結果: "+message);}});} catch (Exception e) {e.printStackTrace();}} }

?????? 在設置了ReturnListener監聽器之后,broker(代理服務器)發出basic.return方法之后,就會回調第52行的handleReturn方法,在這個方法里面我們就可以進行消息的重新發布操作啦;

?????? 測試2:設置mandatory標志,且為exchange綁定隊列(路由鍵和綁定鍵一致)

?

public class ProducerTest {public static void main(String[] args) {String exchangeName = "confirmExchange";String queueName = "confirmQueue";String routingKey = "confirmRoutingKey";String bindingKey = "confirmRoutingKey";//String bindingKey = "confirmBindingKey";int count = 3;ConnectionFactory factory = new ConnectionFactory();factory.setHost("172.16.151.74");factory.setUsername("test");factory.setPassword("test");factory.setPort(5672);//創建生產者Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);producer.run();} }class Sender {private ConnectionFactory factory;private int count;private String exchangeName;private String queueName;private String routingKey;private String bindingKey;public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {this.factory = factory;this.count = count;this.exchangeName = exchangeName;this.queueName = queueName;this.routingKey = routingKey;this.bindingKey = bindingKey;}public void run() {try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創建exchangechannel.exchangeDeclare(exchangeName, "direct", true, false, null);//創建隊列channel.queueDeclare(queueName, true, false, false, null);//綁定exchange和queue channel.queueBind(queueName, exchangeName, bindingKey);//發送持久化消息for(int i = 0;i < count;i++){//第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,//因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話//我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());}channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)throws IOException {//此處便是執行Basic.Return之后回調的地方String message = new String(arg5);System.out.println("Basic.Return返回的結果: "+message);}});} catch (Exception e) {e.printStackTrace();}} }

??????? 通過抓包發現并不會有basic.return方法被調用,查看RabbitMQ管理界面發現消息已經到達了隊列;

??????? 測試3:設置mandatory標志,且exchange綁定隊列(路由鍵和綁定鍵不一致)

??????? 代碼就是把測試2中第6行注釋,第7行注釋打開,注意到此時的routingKey和bindingKey是不一致的,此時我們運行程序,同時抓包得到下面截圖:

?????? 注意一點,我們發送了三條消息,那么相應的應該執行三次basic.return,其中第一次和第二次basic.return顯示在一行上了,第三次是單獨一行,不要誤認為只執行了兩次,從協議的具體返回內容里我們同樣看到了Reply-Text字段值是NO_ROUTE,這種現象在測試1中已經見過了;

?????? 到此,我們明白了mandatory標志的作用:在消息沒有被路由到合適隊列情況下會將消息返還給消息發布者,同時我們測試了哪些情況下消息不會到達合適的隊列,測試1演示的是創建了exchange但是沒有為他綁定隊列導致的消息未到達合適隊列,測試3演示的是創建了exchange同時創建了queue,但是在將兩者綁定的時候,使用的bindingKey和消息發布者使用的rountingKey不一致導致的消息未到達合適隊列;

總結

以上是生活随笔為你收集整理的RabbitMQ(二):mandatory标志的作用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。