日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 四种Exchange

發布時間:2024/9/20 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 四种Exchange 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然后Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似于一個交換機,轉發各個消息分發到相應的隊列中。

?

RabbitMQ提供了四種Exchange模式:fanout、direct、topic、header 。?header模式在實際使用中較少。

Direct Exchange?– 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。?

?

Java代碼??
  • Channel?channel?=?connection.createChannel();??
  • channel.exchangeDeclare("exchangeName",?"direct");?//direct?fanout?topic??
  • channel.queueDeclare("queueName");??
  • channel.queueBind("queueName",?"exchangeName",?"routingKey");??
  • ??
  • byte[]?messageBodyBytes?=?"hello?world".getBytes();??
  • //需要綁定路由鍵??
  • channel.basicPublish("exchangeName",?"routingKey",?MessageProperties.PERSISTENT_TEXT_PLAIN,?messageBodyBytes);??


  • Fanout Exchange?– 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。?

    ?

    Java代碼??
  • Channel?channel?=?connection.createChannel();??
  • channel.exchangeDeclare("exchangeName",?"fanout");?//direct?fanout?topic??
  • channel.queueDeclare("queueName");??
  • channel.queueBind("queueName",?"exchangeName",?"routingKey");??
  • ??
  • channel.queueDeclare("queueName1");??
  • channel.queueBind("queueName1",?"exchangeName",?"routingKey1");??
  • ??
  • byte[]?messageBodyBytes?=?"hello?world".getBytes();??
  • //路由鍵需要設置為空??
  • channel.basicPublish("exchangeName",?"",?MessageProperties.PERSISTENT_TEXT_PLAIN,?messageBodyBytes);??


  • Topic Exchange?– 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。我在RedHat的朋友做了一張不錯的圖,來表明topic交換機是如何工作的:?

    ?

    Java代碼??
  • Channel?channel?=?connection.createChannel();??
  • channel.exchangeDeclare("exchangeName",?"topic");?//direct?fanout?topic??
  • channel.queueDeclare("queueName");??
  • channel.queueBind("queueName",?"exchangeName",?"routingKey.*");??
  • byte[]?messageBodyBytes?=?"hello?world".getBytes();??
  • channel.basicPublish("exchangeName",?"routingKey.one",?MessageProperties.PERSISTENT_TEXT_PLAIN,?messageBodyBytes);??
  • header exchange:

    Headers類型的exchange使用的比較少,它也是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來定義。all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。fanout,direct,topic?exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。

    1.生產者Producer.Java

    ?

    [java]?view plaincopy print?
  • package?cn.slimsmart.rabbitmq.demo.headers;??
  • ??
  • import?java.util.Date;??
  • import?java.util.Hashtable;??
  • import?java.util.Map;??
  • ??
  • import?org.springframework.amqp.core.ExchangeTypes;??
  • ??
  • import?com.rabbitmq.client.AMQP;??
  • import?com.rabbitmq.client.AMQP.BasicProperties;??
  • import?com.rabbitmq.client.AMQP.BasicProperties.Builder;??
  • import?com.rabbitmq.client.Channel;??
  • import?com.rabbitmq.client.Connection;??
  • import?com.rabbitmq.client.ConnectionFactory;??
  • ??
  • public?class?Producer?{??
  • ????private?final?static?String?EXCHANGE_NAME?=?"header-exchange";??
  • ??????
  • ????@SuppressWarnings("deprecation")??
  • ????public?static?void?main(String[]?args)?throws?Exception?{??
  • ????????//?創建連接和頻道??
  • ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  • ????????factory.setHost("192.168.36.102");??
  • ????????//?指定用戶?密碼??
  • ????????factory.setUsername("admin");??
  • ????????factory.setPassword("admin");??
  • ????????//?指定端口??
  • ????????factory.setPort(AMQP.PROTOCOL.PORT);??
  • ????????Connection?connection?=?factory.newConnection();??
  • ????????Channel?channel?=?connection.createChannel();??
  • ??????????
  • ????????//聲明轉發器和類型headers??
  • ????????channel.exchangeDeclare(EXCHANGE_NAME,?ExchangeTypes.HEADERS,false,true,null);??
  • ????????String?message?=?new?Date().toLocaleString()?+?"?:?log?something";??
  • ??????????
  • ????????Map<String,Object>?headers?=??new?Hashtable<String,?Object>();??
  • ????????headers.put("aaa",?"01234");??
  • ????????Builder?properties?=?new?BasicProperties.Builder();??
  • ????????properties.headers(headers);??
  • ??????????
  • ????????//?指定消息發送到的轉發器,綁定鍵值對headers鍵值對??
  • ????????channel.basicPublish(EXCHANGE_NAME,?"",properties.build(),message.getBytes());??
  • ??????????
  • ????????System.out.println("Sent?message?:'"?+?message?+?"'");??
  • ????????channel.close();??
  • ????????connection.close();??
  • ????}??
  • }??
  • 2.消費者Consumer.java

    ?

    ?

    [java]?view plaincopy print?
  • package?cn.slimsmart.rabbitmq.demo.headers;??
  • ??
  • import?java.util.Hashtable;??
  • import?java.util.Map;??
  • ??
  • import?org.springframework.amqp.core.ExchangeTypes;??
  • ??
  • import?com.rabbitmq.client.AMQP;??
  • import?com.rabbitmq.client.Channel;??
  • import?com.rabbitmq.client.Connection;??
  • import?com.rabbitmq.client.ConnectionFactory;??
  • import?com.rabbitmq.client.QueueingConsumer;??
  • ??
  • public?class?Consumer?{??
  • ????private?final?static?String?EXCHANGE_NAME?=?"header-exchange";??
  • ????private?final?static?String?QUEUE_NAME?=?"header-queue";??
  • ??????
  • ????public?static?void?main(String[]?args)?throws?Exception?{??
  • ????????//?創建連接和頻道??
  • ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  • ????????factory.setHost("192.168.36.102");??
  • ????????//?指定用戶?密碼??
  • ????????factory.setUsername("admin");??
  • ????????factory.setPassword("admin");??
  • ????????//?指定端口??
  • ????????factory.setPort(AMQP.PROTOCOL.PORT);??
  • ????????Connection?connection?=?factory.newConnection();??
  • ????????Channel?channel?=?connection.createChannel();??
  • ??????????
  • ????????//聲明轉發器和類型headers??
  • ????????channel.exchangeDeclare(EXCHANGE_NAME,?ExchangeTypes.HEADERS,false,true,null);??
  • ????????channel.queueDeclare(QUEUE_NAME,false,?false,?true,null);??
  • ??????????
  • ????????Map<String,?Object>?headers?=?new?Hashtable<String,?Object>();??
  • ????????headers.put("x-match",?"any");//all?any??
  • ????????headers.put("aaa",?"01234");??
  • ????????headers.put("bbb",?"56789");??
  • ????????//?為轉發器指定隊列,設置binding?綁定header鍵值對??
  • ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,"",?headers);??
  • ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
  • ????????//?指定接收者,第二個參數為自動應答,無需手動應答??
  • ????????channel.basicConsume(QUEUE_NAME,?true,?consumer);??
  • ????????while?(true)?{??
  • ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
  • ????????????String?message?=?new?String(delivery.getBody());??
  • ????????????System.out.println(message);??
  • ????????}???
  • ????}??
  • }??
  • ?

    實例代碼:http://download.csdn.net/detail/tianwei7518/8136413

    總結

    以上是生活随笔為你收集整理的RabbitMQ 四种Exchange的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。