学成在线--13.RabbitMQ工作模式
文章目錄
- 一.Work queues
- 二.Publish/subscribe
- 1.工作模式
- 2.代碼
- 1)生產(chǎn)者
- 2)消費(fèi)者
- 3.測(cè)試
- 4.思考
- 三.Routing
- 1.工作模式
- 2.代碼
- 1)生產(chǎn)者
- 2)消費(fèi)者
- 3.測(cè)試
- 4.思考
- 四.Topics
- 1.工作模式
- 2.代碼
- 1)生產(chǎn)者
- 2)消費(fèi)者
- 3.測(cè)試
- 4.思考
- 五.Header模式
- 1.生產(chǎn)者
- 2.消費(fèi)者
- 六.RPC
RabbitMQ有以下幾種工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
一.Work queues
work queues與入門(mén)程序HelloWord相比,多了一個(gè)消費(fèi)端,兩個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。
應(yīng)用場(chǎng)景:對(duì)于任務(wù)過(guò)重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。
測(cè)試:
1、使用入門(mén)程序HelloWord,啟動(dòng)多個(gè)消費(fèi)者。
2、生產(chǎn)者發(fā)送多個(gè)消息。
結(jié)果:
1、一條消息只會(huì)被一個(gè)消費(fèi)者接收;
2、rabbit采用輪詢的方式將消息是平均發(fā)送給消費(fèi)者的;
3、消費(fèi)者在處理完某條消息后,才會(huì)收到下一條消息。
二.Publish/subscribe
1.工作模式
發(fā)布訂閱模式:
1、每個(gè)消費(fèi)者監(jiān)聽(tīng)自己的隊(duì)列。
2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列,每個(gè)綁定交換機(jī)的隊(duì)列都將接收
到消息
2.代碼
案例:
當(dāng)用戶充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶,通知方式有短信、郵件多種方法 。
1)生產(chǎn)者
聲明Exchange_fanout_inform交換機(jī)。
聲明兩個(gè)隊(duì)列并且綁定到此交換機(jī),綁定時(shí)不需要指定routingkey
發(fā)送消息時(shí)不需要指定routingkey
2)消費(fèi)者
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer02_subscribe_email {//隊(duì)列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個(gè)交換機(jī)//參數(shù):String exchange, String type/*** 參數(shù)明細(xì):* 1、交換機(jī)的名稱* 2、交換機(jī)的類(lèi)型* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe* direct:對(duì)應(yīng)的Routing 工作模式* topic:對(duì)應(yīng)的Topics工作模式* headers: 對(duì)應(yīng)的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//進(jìn)行交換機(jī)和隊(duì)列綁定//參數(shù):String queue, String exchange, String routingKey/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、exchange 交換機(jī)名稱* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//實(shí)現(xiàn)消費(fèi)方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當(dāng)接收到消息后此方法將被調(diào)用* @param consumerTag 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume* @param envelope 信封,通過(guò)envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機(jī)String exchange = envelope.getExchange();//消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監(jiān)聽(tīng)隊(duì)列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);} }3.測(cè)試
打開(kāi)RabbitMQ的管理界面,觀察交換機(jī)綁定情況:
使用生產(chǎn)者發(fā)送若干條消息,每條消息都轉(zhuǎn)發(fā)到各個(gè)隊(duì)列,并且每個(gè)消費(fèi)者都接收到了消息。
4.思考
1、publish/subscribe與work queues有什么異同。
區(qū)別:
1)work queues不用定義交換機(jī),而publish/subscribe需要定義交換機(jī)。
2)publish/subscribe的生產(chǎn)方是面向交換機(jī)發(fā)送消息,work queues的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。
3)publish/subscribe需要設(shè)置隊(duì)列和交換機(jī)的綁定,work queues不需要設(shè)置,實(shí)質(zhì)上work queues會(huì)將隊(duì)列綁定到默認(rèn)的交換機(jī) 。
相同點(diǎn):
所以兩者實(shí)現(xiàn)的發(fā)布/訂閱的效果是一樣的,多個(gè)消費(fèi)端監(jiān)聽(tīng)同一個(gè)隊(duì)列不會(huì)重復(fù)消費(fèi)消息
2、實(shí)質(zhì)工作用什么 publish/subscribe還是work queues。
建議使用 publish/subscribe,發(fā)布訂閱模式比工作隊(duì)列模式更強(qiáng)大,并且發(fā)布訂閱模式可以指定自己專用的交換
機(jī)
三.Routing
1.工作模式
路由模式:
1、每個(gè)消費(fèi)者監(jiān)聽(tīng)自己的隊(duì)列,并且設(shè)置routingkey。
2、生產(chǎn)者將消息發(fā)給交換機(jī),由交換機(jī)根據(jù)routingkey來(lái)轉(zhuǎn)發(fā)消息到指定的隊(duì)列。
2.代碼
聲明exchange_routing_inform交換機(jī)。
聲明兩個(gè)隊(duì)列并且綁定到此交換機(jī),綁定時(shí)需要指定routingkey
發(fā)送消息時(shí)需要指定routingkey
1)生產(chǎn)者
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Producer03_routing {//隊(duì)列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";private static final String ROUTINGKEY_SMS="inform_sms";public static void main(String[] args) {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊(duì)列,如果隊(duì)列在mq 中沒(méi)有則要?jiǎng)?chuàng)建//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個(gè)交換機(jī)//參數(shù):String exchange, String type/*** 參數(shù)明細(xì):* 1、交換機(jī)的名稱* 2、交換機(jī)的類(lèi)型* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe* direct:對(duì)應(yīng)的Routing 工作模式* topic:對(duì)應(yīng)的Topics工作模式* headers: 對(duì)應(yīng)的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//進(jìn)行交換機(jī)和隊(duì)列綁定//參數(shù):String queue, String exchange, String routingKey/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、exchange 交換機(jī)名稱* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");//發(fā)送消息//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數(shù)明細(xì):* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來(lái)將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱* 3、props,消息的屬性* 4、body,消息內(nèi)容*/for(int i=0;i<5;i++){//發(fā)送消息的時(shí)候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發(fā)送消息的時(shí)候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發(fā)送消息的時(shí)候指定routingKeyString message = "send inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//關(guān)閉連接//先關(guān)閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}} }2)消費(fèi)者
消費(fèi)者一:郵件
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer03_routing_email {//隊(duì)列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";public static void main(String[] args) throws IOException, TimeoutException {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個(gè)交換機(jī)//參數(shù):String exchange, String type/*** 參數(shù)明細(xì):* 1、交換機(jī)的名稱* 2、交換機(jī)的類(lèi)型* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe* direct:對(duì)應(yīng)的Routing 工作模式* topic:對(duì)應(yīng)的Topics工作模式* headers: 對(duì)應(yīng)的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//進(jìn)行交換機(jī)和隊(duì)列綁定//參數(shù):String queue, String exchange, String routingKey/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、exchange 交換機(jī)名稱* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);//實(shí)現(xiàn)消費(fèi)方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當(dāng)接收到消息后此方法將被調(diào)用* @param consumerTag 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume* @param envelope 信封,通過(guò)envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機(jī)String exchange = envelope.getExchange();//消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監(jiān)聽(tīng)隊(duì)列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);} }消費(fèi)者二:短信
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer03_routing_sms {//隊(duì)列名稱private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_SMS="inform_sms";public static void main(String[] args) throws IOException, TimeoutException {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個(gè)交換機(jī)//參數(shù):String exchange, String type/*** 參數(shù)明細(xì):* 1、交換機(jī)的名稱* 2、交換機(jī)的類(lèi)型* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe* direct:對(duì)應(yīng)的Routing 工作模式* topic:對(duì)應(yīng)的Topics工作模式* headers: 對(duì)應(yīng)的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//進(jìn)行交換機(jī)和隊(duì)列綁定//參數(shù):String queue, String exchange, String routingKey/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、exchange 交換機(jī)名稱* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串*/channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);//實(shí)現(xiàn)消費(fèi)方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當(dāng)接收到消息后此方法將被調(diào)用* @param consumerTag 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume* @param envelope 信封,通過(guò)envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機(jī)String exchange = envelope.getExchange();//消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監(jiān)聽(tīng)隊(duì)列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);} }3.測(cè)試
使用生產(chǎn)者發(fā)送若干條消息,交換機(jī)根據(jù)routingkey轉(zhuǎn)發(fā)消息到指定的隊(duì)列
4.思考
1、Routing模式和Publish/subscibe有什么區(qū)別?
Routing模式要求隊(duì)列在綁定交換機(jī)時(shí)要指定routingkey,消息會(huì)轉(zhuǎn)發(fā)到符合routingkey的隊(duì)列。
四.Topics
1.工作模式
路由模式:
1、每個(gè)消費(fèi)者監(jiān)聽(tīng)自己的隊(duì)列,并且設(shè)置帶統(tǒng)配符的routingkey。
2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)根據(jù)routingkey來(lái)轉(zhuǎn)發(fā)消息到指定的隊(duì)列。
2.代碼
案例:
根據(jù)用戶的通知設(shè)置去通知用戶,設(shè)置接收Email的用戶只接收Email,設(shè)置接收sms的用戶只接收sms,設(shè)置兩種
通知類(lèi)型都接收的則兩種通知都有效。
1)生產(chǎn)者
聲明交換機(jī),指定topic類(lèi)型
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Producer04_topics {//隊(duì)列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";private static final String ROUTINGKEY_SMS="inform.#.sms.#";public static void main(String[] args) {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊(duì)列,如果隊(duì)列在mq 中沒(méi)有則要?jiǎng)?chuàng)建//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個(gè)交換機(jī)//參數(shù):String exchange, String type/*** 參數(shù)明細(xì):* 1、交換機(jī)的名稱* 2、交換機(jī)的類(lèi)型* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe* direct:對(duì)應(yīng)的Routing 工作模式* topic:對(duì)應(yīng)的Topics工作模式* headers: 對(duì)應(yīng)的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//進(jìn)行交換機(jī)和隊(duì)列綁定//參數(shù):String queue, String exchange, String routingKey/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、exchange 交換機(jī)名稱* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);//發(fā)送消息//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數(shù)明細(xì):* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來(lái)將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱* 3、props,消息的屬性* 4、body,消息內(nèi)容*/for(int i=0;i<5;i++){//發(fā)送消息的時(shí)候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發(fā)送消息的時(shí)候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//發(fā)送消息的時(shí)候指定routingKeyString message = "send sms and email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//關(guān)閉連接//先關(guān)閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}} }2)消費(fèi)者
消費(fèi)者一:email
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer04_topics_email {//隊(duì)列名稱private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static void main(String[] args) throws IOException, TimeoutException {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//聲明一個(gè)交換機(jī)//參數(shù):String exchange, String type/*** 參數(shù)明細(xì):* 1、交換機(jī)的名稱* 2、交換機(jī)的類(lèi)型* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe* direct:對(duì)應(yīng)的Routing 工作模式* topic:對(duì)應(yīng)的Topics工作模式* headers: 對(duì)應(yīng)的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//進(jìn)行交換機(jī)和隊(duì)列綁定//參數(shù):String queue, String exchange, String routingKey/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、exchange 交換機(jī)名稱* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);//實(shí)現(xiàn)消費(fèi)方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當(dāng)接收到消息后此方法將被調(diào)用* @param consumerTag 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume* @param envelope 信封,通過(guò)envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機(jī)String exchange = envelope.getExchange();//消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監(jiān)聽(tīng)隊(duì)列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);} }消費(fèi)者二:sms
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer04_topics_sms {//隊(duì)列名稱private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_SMS="inform.#.sms.#";public static void main(String[] args) throws IOException, TimeoutException {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//聲明一個(gè)交換機(jī)//參數(shù):String exchange, String type/*** 參數(shù)明細(xì):* 1、交換機(jī)的名稱* 2、交換機(jī)的類(lèi)型* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe* direct:對(duì)應(yīng)的Routing 工作模式* topic:對(duì)應(yīng)的Topics工作模式* headers: 對(duì)應(yīng)的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//進(jìn)行交換機(jī)和隊(duì)列綁定//參數(shù):String queue, String exchange, String routingKey/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、exchange 交換機(jī)名稱* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串*/channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);//實(shí)現(xiàn)消費(fèi)方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當(dāng)接收到消息后此方法將被調(diào)用* @param consumerTag 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume* @param envelope 信封,通過(guò)envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機(jī)String exchange = envelope.getExchange();//消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監(jiān)聽(tīng)隊(duì)列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);} }3.測(cè)試
使用生產(chǎn)者發(fā)送若干條消息,交換機(jī)根據(jù)routingkey統(tǒng)配符匹配并轉(zhuǎn)發(fā)消息到指定的隊(duì)列。
4.思考
1、本案例的需求使用Routing工作模式能否實(shí)現(xiàn)?
使用Routing模式也可以實(shí)現(xiàn)本案例,共設(shè)置三個(gè) routingkey,分別是email、sms、all,email隊(duì)列綁定email和all,sms隊(duì)列綁定sms和all,這樣就可以實(shí)現(xiàn)上邊案例的功能,實(shí)現(xiàn)過(guò)程比topics復(fù)雜。
Topic模式更多加強(qiáng)大,它可以實(shí)現(xiàn)Routing、publish/subscirbe模式的功能。
五.Header模式
header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對(duì))匹配隊(duì)列。
案例:
根據(jù)用戶的通知設(shè)置去通知用戶,設(shè)置接收Email的用戶只接收Email,設(shè)置接收sms的用戶只接收sms,設(shè)置兩種
通知類(lèi)型都接收的則兩種通知都有效。
1.生產(chǎn)者
隊(duì)列與交換機(jī)綁定的代碼與之前不同,如下:
Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_type", "email"); Map<String, Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms); String message = "email inform to user"+i; Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("inform_type", "email");//匹配email通知消費(fèi)者綁定的header //headers.put("inform_type", "sms");//匹配sms通知消費(fèi)者綁定的header AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //Email通知 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());2.消費(fèi)者
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_email", "email"); //交換機(jī)和隊(duì)列綁定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); //指定消費(fèi)隊(duì)列 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);六.RPC
RPC即客戶端遠(yuǎn)程調(diào)用服務(wù)端的方法 ,使用MQ可以實(shí)現(xiàn)RPC的異步調(diào)用,基于Direct交換機(jī)實(shí)現(xiàn),流程如下:
1、客戶端即是生產(chǎn)者就是消費(fèi)者,向RPC請(qǐng)求隊(duì)列發(fā)送RPC調(diào)用消息,同時(shí)監(jiān)聽(tīng)RPC響應(yīng)隊(duì)列。
2、服務(wù)端監(jiān)聽(tīng)RPC請(qǐng)求隊(duì)列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果
3、服務(wù)端將RPC方法的結(jié)果發(fā)送到RPC響應(yīng)隊(duì)列
4、客戶端(RPC調(diào)用方)監(jiān)聽(tīng)RPC響應(yīng)隊(duì)列,接收到RPC調(diào)用結(jié)果。
總結(jié)
以上是生活随笔為你收集整理的学成在线--13.RabbitMQ工作模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 数组与矩阵的区别
- 下一篇: ComponentName知识