日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

2_RabbitMQ工作模式_Work queues_Publish/Subscribe_Routing_Topics_HeaderRpc

發布時間:2024/7/19 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2_RabbitMQ工作模式_Work queues_Publish/Subscribe_Routing_Topics_HeaderRpc 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 2_RabbitMQ工作模式
    • 1.Work queues
    • 2.Publish/Subscribe
      • 1.工作模式
      • 2.代碼
        • 1.生產者
          • 1.指定消息隊列相關消息
          • 2.建立連接&綁定隊列
          • 3.發送消息
          • 完整代碼:
        • 2.消費者
          • 1.指定消息隊列相關消息
          • 2.建立連接&綁定隊列
          • 3.實現消費方法&監聽消息
          • 完整代碼
      • 3.小結
    • 3.Routing
      • 1.工作模式
      • 2.代碼
        • 1.生產者
        • 2.消費者
      • 3.小結
    • 4.Topics
      • 1.工作模式
      • 2.代碼
        • 1.生產者
        • 2.消費者
      • 3.小結
    • 5.Header&Rpc
      • header模式
        • 1)生產者
        • 2)發送郵件消費者
      • Rpc

2_RabbitMQ工作模式

前置知識:工作模式指的是某條消息該怎么分發到隊列?分發策略

1.Work queues

work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。

應用場景:對于 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

測試:

1、使用入門程序,啟動多個消費者。

2、生產者發送多個消息。

結果:

1、一條消息只會被一個消費者接收;

2、rabbit采用輪詢的方式將消息是平均發送給消費者的;

3、消費者在處理完某條消息后,才會收到下一條消息。

2.Publish/Subscribe

1.工作模式

發布訂閱模式:

1、每個消費者監聽自己的隊列。

2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息

此交換機的概念類似計算機網絡中的交換機,接受包然后分發下去

2.代碼

案例:

用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有短信、郵件多種方法 。

1.生產者

聲明exchange_routing_inform交換機。

聲明兩個隊列并且綁定到此交換機,綁定時需要指定routingkey

發送消息時需要指定routingkey

1.指定消息隊列相關消息
//隊列名稱 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");
2.建立連接&綁定隊列
Connection connection = null; Channel channel = null;//建立新連接 connection = connectionFactory.newConnection(); //創建會話通道,生產者和mq服務所有通信都在channel通道中完成 channel = connection.createChannel(); //聲明隊列,如果隊列在mq 中沒有則要創建 //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //聲明一個交換機 //參數:String exchange, String type /*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing 工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //進行交換機和隊列綁定 //參數:String queue, String exchange, String routingKey /*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
3.發送消息
//發送消息 //參數:String exchange, String routingKey, BasicProperties props, byte[] body /*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*/ for(int i=0;i<20;i++){//消息內容String message = "send inform message to user "+(i+1);channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message); }
完整代碼:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:10**/ public class Producer02_publish {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊列,如果隊列在mq 中沒有則要創建//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing 工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");//發送消息//參數:String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*/for(int i=0;i<20;i++){//消息內容String message = "send inform message to user "+(i+1);channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//關閉連接//先關閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}} }

2.消費者

1.指定消息隊列相關消息
//隊列名稱 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";//通過連接工廠創建新的連接和mq建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mq connectionFactory.setVirtualHost("/");
2.建立連接&綁定隊列
Connection connection = null; Channel channel = null;//建立新連接 connection = connectionFactory.newConnection(); //創建會話通道,生產者和mq服務所有通信都在channel通道中完成 channel = connection.createChannel(); //聲明隊列,如果隊列在mq 中沒有則要創建 //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //聲明一個交換機 //參數:String exchange, String type /*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing 工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //進行交換機和隊列綁定 //參數:String queue, String exchange, String routingKey /*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
3.實現消費方法&監聽消息
//實現消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag 消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);} }; //監聽隊列 //參數:String queue, boolean autoAck, Consumer callback /*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
完整代碼
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/ public class Consumer02_subscribe_email {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing 工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag 消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);} }

3.小結

3.Routing

1.工作模式

路由模式:

1、每個消費者監聽自己的隊列,并且設置routingkey。

2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。

2.代碼

1.生產者

全部代碼:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 19:23**/ public class Producer03_routing {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";private static final String ROUTINGKEY_SMS="inform_sms";public static void main(String[] args) {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊列,如果隊列在mq 中沒有則要創建//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing 工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");//發送消息//參數:String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*//* for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());System.out.println("send to mq "+message);}*/for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//關閉連接//先關閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}} }

2.消費者

全部代碼:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/ public class Consumer03_routing_email {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing 工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag 消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);} }

3.小結

4.Topics

1.工作模式

2.代碼

1.生產者

聲明exchange_routing_inform交換機。

聲明兩個隊列并且綁定到此交換機,綁定時需要指定routingkey

發送消息時需要指定routingkey

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 19:23**/ public class Producer04_topics {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";private static final String ROUTINGKEY_SMS="inform.#.sms.#";public static void main(String[] args) {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊列,如果隊列在mq 中沒有則要創建//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing 工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);//發送消息//參數:String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*/for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發送消息的時候指定routingKeyString message = "send sms and email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//關閉連接//先關閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}} }

2.消費者

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/ public class Consumer04_topics_email {//隊列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個交換機//參數:String exchange, String type/*** 參數明細:* 1、交換機的名稱* 2、交換機的類型* fanout:對應的rabbitmq的工作模式是 publish/subscribe* direct:對應的Routing 工作模式* topic:對應的Topics工作模式* headers: 對應的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//進行交換機和隊列綁定//參數:String queue, String exchange, String routingKey/*** 參數明細:* 1、queue 隊列名稱* 2、exchange 交換機名稱* 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);//實現消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調用* @param consumerTag 消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監聽隊列//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);} }

3.小結

5.Header&Rpc

header模式

header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配

隊列。

案例:

根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種

通知類型都接收的則兩種通知都有效。

1)生產者

隊列與交換機綁定的代碼與之前不同,如下:

通知:

2)發送郵件消費者

Rpc

RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的異步調用,基于Direct交換機實現,流程如下:

1、客戶端即是生產者就是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。

2、服務端監聽RPC請求隊列的消息,收到消息后執行服務端的方法,得到方法返回的結果

3、服務端將RPC方法 的結果發送到RPC響應隊列

4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。

總結

以上是生活随笔為你收集整理的2_RabbitMQ工作模式_Work queues_Publish/Subscribe_Routing_Topics_HeaderRpc的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。