日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ(四)交换机exchange

發布時間:2025/3/20 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ(四)交换机exchange 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

5.1 exchanges

5.1.1 概念

RabbitMQ 消息傳遞模型的核心思想是: 生產者生產的消息從不會直接發送到隊列。實際上,通常生產者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。

相反,生產者只能將消息發送到交換機(exchange),交換機工作的內容非常簡單,一方面它接收來自生產者的消息,另一方面將它們推入隊列。交換機必須確切知道如何處理收到的消息。是應該把這些消息放到特定隊列還是說把他們到許多隊列中還是說應該丟棄它們。這就的由交換機的類型來決定。

5.1.2 類型

  • 直接(direct)

  • 主題(topic)

  • 標題(headers)

  • 扇出(fanout)

5.1.3 無名exchange

在本教程的前面部分我們對 exchange 一無所知,但仍然能夠將消息發送到隊列。之前能實現的原因是因為我們使用的是默認交換,我們通過空字符串(“”)進行標識。

channel.basicPublish("", queueName, null, message.getBytes());

第一個參數是交換機的名稱。空字符串表示默認或無名稱交換機:消息能路由發送到隊列中其實是由 routingKey(bindingkey)綁定 key 指定的,如果它存在的話。

5.2 臨時隊列

之前的章節我們使用的是具有特定名稱的隊列(還記得 hello 和 ack_queue 嗎?)。隊列的名稱我們來說至關重要-我們需要指定我們的消費者去消費哪個隊列的消息。

每當我們連接到 Rabbit 時,我們都需要一個全新的空隊列,為此我們可以創建一個具有隨機名稱的隊列,或者能讓服務器為我們選擇一個隨機隊列名稱那就更好了。其次一旦我們斷開了消費者的連接,隊列將被自動刪除。

創建臨時隊列的方式如下:

String queueName = channel.queueDeclare().getQueue();

創建出來之后長成這樣:

5.3 綁定

什么是 bingding 呢,binding 其實是 exchange 和 queue 之間的橋梁,它告訴我們 exchange 和那個隊列進行了綁定關系。比如說下面這張圖告訴我們的就是 X 與 Q1 和 Q2 進行了綁定

5.4 Fanout

5.4.1 介紹

Fanout 這種類型非常簡單。正如從名稱中猜到的那樣,它是將接收到的所有消息廣播到它知道的所有隊列中。系統中默認有些 exchange 類型。

5.4.2 實戰

??

ReceiveLogs01 將接收到的消息打印在控制臺

public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一個臨時的隊列 隊列的名稱是隨機的* 當消費者斷開和該隊列的連接時 隊列自動刪除*/String queueName = channel.queueDeclare().getQueue();//把該臨時隊列綁定我們的 exchange 其中 routingKey(也稱之為 binding key)為空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("控制臺打印接收到的消息" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} ? }

ReceiveLogs02 將接收到的消息存儲在磁盤

public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一個臨時的隊列 隊列的名稱是隨機的* 當消費者斷開和該隊列的連接時 隊列自動刪除*/String queueName = channel.queueDeclare().getQueue();//把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息寫到文件.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);File file = new File("D:\\workspace\\study_log\\rabbitmq_info.txt");FileUtils.writeStringToFile(file, message, "UTF-8", true);System.out.println("數據寫入文件成功");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); ?} }

EmitLog 發送消息給兩個消費者接收

public class EmitLog {private static final String EXCHANGE_NAME = "logs"; ?public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {/*** 聲明一個 exchange* 1.exchange 的名稱* 2.exchange 的類型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner sc = new Scanner(System.in);System.out.println("請輸入信息");while (sc.hasNext()) {String message = sc.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));System.out.println("生產者發出消息" + message);}}} }

啟動生產者和消費者,發送幾條消息,可以發現ReceiveLogs01和ReceiveLogs02都接收到了該消息。

5.5 Direct

5.5.1 介紹

上一節中的我們的日志系統將所有消息廣播給所有消費者,對此我們想做一些改變,例如我們希望將日志消息寫入磁盤的程序僅接收嚴重錯誤(errros),而不存儲哪些警告(warning)或信息(info)日志 消息避免浪費磁盤空間。Fanout 這種交換類型并不能給我們帶來很大的靈活性-它只能進行無意識的廣播,在這里我們將使用 direct 這種類型來進行替換,這種類型的工作方式是,消息只去到它綁定的 routingKey 隊列中去。

在上面這張圖中,我們可以看到 X 綁定了兩個隊列,綁定類型是 direct。隊列 Q1 綁定鍵為 orange, 隊列 Q2 綁定鍵有兩個:一個綁定鍵為 black,另一個綁定鍵為 green。

在這種綁定情況下,生產者發布消息到 exchange 上,綁定鍵為 orange 的消息會被發布到隊列 Q1。綁定鍵為 blackgreen 和的消息會被發布到隊列 Q2,其他消息類型的消息將被丟棄。

5.5.2 多重綁定

?當然如果 exchange 的綁定類型是 direct,但是它綁定的多個隊列的 key 如果都相同,在這種情 況下雖然綁定類型是 direct 但是它表現的就和 fanout 有點類似了,就跟廣播差不多,如上圖所示。

5.5.3 實戰

接收error類

public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "disk";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" 接收綁定鍵 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }

接收warn、info類

public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "console";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" 接收綁定鍵 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} ? }

消息發送類

public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs"; ?public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//創建多個 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info", "普通 info 信息");bindingKeyMap.put("warning", "警告 warning 信息");bindingKeyMap.put("error", "錯誤 error 信息");//debug 沒有消費這接收這個消息 所有就丟失了bindingKeyMap.put("debug", "調試 debug 信息");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME, bindingKey, null,message.getBytes(StandardCharsets.UTF_8));System.out.println("生產者發出消息:" + message);}}} ? }

5.6 Topics

5.6.1 之前類型的問題

在上一個小節中,我們改進了日志記錄系統。我們沒有使用只能進行隨意廣播的 fanout 交換機,而是使用了 direct 交換機,從而有能實現有選擇性地接收日志。

盡管使用 direct 交換機改進了我們的系統,但是它仍然存在局限性-比方說我們想接收的日志類型有 info.base 和 info.advantage,某個隊列只想 info.base 的消息,那這個時候 direct 就辦不到了。這個時候 就只能使用 topic 類型。

5.6.2 Topic的要求

發送到類型是 topic 交換機的消息的 routing_key 不能隨意寫,必須滿足一定的要求,它必須是一個單詞列表,以點號分隔開。這些單詞可以是任意單詞,比如說:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".這種類型的。當然這個單詞列表最多不能超過 255 個字節。

在這個規則列表中,其中有兩個替換符是大家需要注意的:

  • *(星號)可以代替一個單詞

  • #(井號)可以替代零個或多個單詞

5.6. 3 匹配案例

上圖是一個隊列綁定關系圖,我們來看看他們之間數據接收情況是怎么樣的

routing_key消費情況
quick.orange.rabbit被隊列 Q1Q2 接收到
azy.orange.elephant被隊列 Q1Q2 接收到
quick.orange.fox被隊列 Q1 接收到
lazy.brown.fox被隊列 Q2 接收到
lazy.pink.rabbit雖然滿足兩個綁定但只被隊列 Q2 接收一次
quick.brown.fox不匹配任何綁定不會被任何隊列接收到會被丟棄
quick.orange.male.rabbit是四個單詞不匹配任何綁定會被丟棄
lazy.orange.male.rabbit是四個單詞但匹配 Q2

當隊列綁定關系是下列這種情況時需要引起注意:

  • 當一個隊列綁定鍵是#,那么這個隊列將接收所有數據,就有點像 fanout 了

  • 如果隊列綁定鍵當中沒有#和*出現,那么該隊列綁定類型就是 direct 了

總結

以上是生活随笔為你收集整理的RabbitMQ(四)交换机exchange的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。