日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMq 发布订阅 Publish/Subscribe fanout/direct

發布時間:2025/3/15 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMq 发布订阅 Publish/Subscribe fanout/direct 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

?

概述

交換機

臨時隊列

代碼


概述

在上篇中了解到rabbitmq 生產者生產消息到隊列,多個消費者可以接受。這篇文章主要記錄廣播類型為fanout。生產者不在將產生的消息發送到隊列,而是將消息發送到交換機exchange,交換機會根據不同的交換規則,將消息發送到不同的隊列。交換器必須知道她所接收的消息是什么?它應該將消息放到哪個隊列中或者還是應該丟棄?這些規則都是按照交換機的規則來確定的。

? ? ? ? ? ?

交換機

Exchange(交換機):生產者會將消息發送到交換機,然后交換機通過路由策略(規則)將消息路由到匹配的隊列中去

交換規則:

Fanout 不處理路由。需要簡單的將隊列綁定到交換機上。一個發送到該類型交換機的消息都會被廣播到與該交換機綁定的所有隊列上。(本篇文章)

direct:它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。

channel.basicPublish(“direct”, “warn”, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

我們定義direct交換機,綁定路由warn 這時候發送消息只能發送的綁定的隊列中 如隊列1 隊列2 但是如果綁定路由為info 則只有隊列2可以收到。

topic:direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似。

定義

//參數1 名稱 參數2 類型 channel.exchangeDeclare("fanout", "fanout");

臨時隊列

在生產者和消費者之間創建一個新的隊列,這時候又不想使用原來的隊列,臨時隊列就是為這個場景而生的:

首先,每當我們連接到RabbitMQ,我們需要一個新的空隊列,我們可以用一個隨機名稱來創建,或者說讓服務器選擇一個隨機隊列名稱給我們,一旦我們斷開消費者,隊列應該立即被刪除。

在Java客戶端,提供queuedeclare()為我們創建一個非持久化、獨立、自動刪除的隊列名稱。

隊列綁定

BindOk com.rabbitmq.client.Channel.queueBind(String queue, String exchange, String routingKey) throws IOExceptionBind a queue to an exchange, with no extra arguments.Parameters:queue the name of the queueexchange the name of the exchangeroutingKey the routine key to use for the binding Returns:a binding-confirm method if the binding was successfully created Throws:java.io.IOException - if an error is encountered

代碼

該代碼為fanout模式

生產者

package com.ll.mq.hellomq.queue;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;/*** * @author ll 生產者**/ public class Producer {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 分發消息for(int i = 0 ; i < 5; i++){String message = "Hello World! " + i;channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" send'" + message + "'");}channel.close();connection.close();}}

消費者1

package com.ll.mq.hellomq.fanout;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /*** * @author ll 消費者1**/ public class ConsumerOne {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明交換機類型channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 獲取臨時隊列String queueName = channel.queueDeclare().getQueue();// 綁定channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" ConsumerOne '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }

消費者2

package com.ll.mq.hellomq.fanout;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;public class ConsumerTwo {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明交換機類型channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//獲取臨時隊列String queueName = channel.queueDeclare().getQueue();//綁定channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" ConsumerTwo '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }

結果:

生產者:?send'Hello World! 0'
? ? ? ? ? ? ? ?send'Hello World! 1'
? ? ? ? ? ? ? ?send'Hello World! 2'
? ? ? ? ? ? ? ?send'Hello World! 3'
? ? ? ? ? ? ? ?send'Hello World! 4'

消費者1? ??ConsumerOne 'Hello World! 0'
? ? ? ? ? ? ? ?ConsumerOne 'Hello World! 1'
? ? ? ? ? ? ? ?ConsumerOne 'Hello World! 2'
? ? ? ? ? ? ? ConsumerOne 'Hello World! 3'
? ? ? ? ? ? ? ConsumerOne 'Hello World! 4'

消費者2? ?ConsumerTwo'Hello World! 0'
? ? ? ? ? ? ? ?ConsumerTwo'Hello World! 1'
? ? ? ? ? ? ? ?ConsumerTwo'Hello World! 2'
? ? ? ? ? ? ? ConsumerTwo'Hello World! 3'
? ? ? ? ? ? ? ConsumerTwo'Hello World! 4'

rabbitmq結果:

?

下一篇?https://blog.csdn.net/lilongwangyamin/article/details/105117288?rabbitmq topic

總結

以上是生活随笔為你收集整理的RabbitMq 发布订阅 Publish/Subscribe fanout/direct的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 色亭亭 | 久久女人网 | 91原创视频在线观看 | 人乳喂奶hd无中字 | 一区二区三区视频免费视 | 亚洲一区二区三区无码久久 | 日韩精品视频在线播放 | 免费黄色高清视频 | 精品少妇一区二区三区免费观看 | 天堂а√在线最新版中文在线 | 国产粉嫩在线 | 亚洲一区二区不卡在线观看 | 五月天一区二区 | 黄色工厂这里只有精品 | 中文字幕第| 99久久久无码国产精品性波多 | 羞羞答答av | 男女裸体影院高潮 | 就是色| 日日综合网 | 欧美日韩精品一区二区三区视频播放 | 日本福利视频一区 | 一二区免费视频 | 亚洲色图国产视频 | 国产精品99精品无码视亚 | 99视频在线免费观看 | 中文字幕xxxx| 日本a区 | 国产伦精品一区二区. | 波多野结衣先锋影音 | 天天看天天做 | 欧美性猛交ⅹxxx乱大交3 | 国产内射一区 | 欧美在线黄| av色综合| 日韩欧美成人一区 | 综合在线观看 | 欧美v日韩 | 海角社区id| 久久久久亚洲av无码麻豆 | 久久视频在线播放 | 国内自拍视频在线播放 | 成人理论片| 最新av在线| 欧美自拍视频 | 日韩免费观看av | 黄色大全在线观看 | 香蕉一区二区 | 国产黄色自拍视频 | 亚洲Av无码成人精品区伊人 | 国产第一区第二区 | 久久一二三区 | 无码人妻aⅴ一区二区三区69岛 | 日本伦理一区二区 | 久久99精品久久久 | 性欧美视频在线观看 | 国产av无码专区亚洲a∨毛片 | 亚洲成人基地 | 久久人人爽人人爽人人片av高清 | 国产偷人妻精品一区二区在线 | 欧美黑人性xxx猛交 少妇无套内谢久久久久 | 国产精品丝袜黑色高跟 | 大象传媒成人在线观看 | 亚洲一二三四在线观看 | 成年人黄色在线观看 | 97精品熟女少妇一区二区三区 | 韩国主播青草200vip视频 | 直接看的av | 男人的av | 特级淫片裸体免费看 | 日韩一级网站 | 91一区视频 | 国产猛男猛女超爽免费视频 | 日本99视频| 中国特级毛片 | 亚洲网站免费看 | 成人在线观看视频网站 | 在线观看av的网站 | 亚洲一区观看 | 欧州一区二区三区 | 欧美日韩成人在线 | 国产精品骚 | 99国产精品无码 | 成人精品视频一区二区 | 懂色一区二区三区免费观看 | www.青青草 | 情侣在线视频 | 91亚洲国产成人精品性色 | 日本黄色性视频 | 精品久久久久久久久久岛国gif | 邻居少妇张开腿让我爽了在线观看 | 久久国产精品区 | 亚洲精品一区二区三区新线路 | 天天摸天天碰天天爽天天弄 | 久久久久久久久久久久久女过产乱 | 亚洲精品中文无码AV在线播放 | 老女人做爰全过程免费的视频 | 双性受孕h堵精大肚生子 | 草逼免费视频 |