javascript
RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ
什么是RabbitMQ
1.1 MQ概述
MQ全稱 Message Queue(消息隊列),是在消息的傳輸過程中保存消息的容器。多用于分布式系統之間進行通信。
? MQ,消息隊列,存儲消息的中間件
? 分布式系統通信兩種方式:直接遠程調用 和 借助第三方 完成間接通信
? 發送方稱為生產者,接收方稱為消費者
1.2 MQ 的優勢和劣勢
- 優勢
- 應用解耦:提供了程序的可擴展性 系統的耦合性越高,容錯性就越低,可維護性就越低。
- 異步提速:提高了系統的性能 提升用戶體驗和系統吞吐量(單位時間內處理請求的數目)。
- 削峰填谷:提升了系統的穩定性
- 使用了 MQ 之后,限制消費消息的速度為1000,這樣一來,高峰期產生的數據勢必會被積壓在 MQ 中,高峰 就被“削”掉了,但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持在1000,直 到消費完積壓的消息,這就叫做“填谷”。 使用MQ后,可以提高系統穩定性。
- 劣勢
- 增加了系統維護成本
- 系統可用性降低(忽略)
1.3 常見的 MQ 產品
Pulsar 最新流行(可以了解一下這個MQ產品)
1.4 RabbitMQ 簡介
AMQP,即 Advanced Message Queuing Protocol(高級消息隊列協議),是一個網絡協議,是應用層協議
的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中
間件不同產品,不同的開發語言等條件的限制。2006年,AMQP 規范發布。類比HTTP。
2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布。RabbitMQ 采用 Erlang 語言開發。
Erlang 語言由 Ericson 設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛。
RabbitMQ 基礎架構如下圖:
RabbitMQ 中的相關概念:
? Broker:接收和分發消息的應用,RabbitMQ Server就是 Message Broker
? Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等
? Connection:publisher/consumer 和 broker 之間的 TCP 連接
? Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection極大減少了操作系統建立 TCP connection 的開銷
? Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發消息到queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
? Queue:消息最終被送到這里等待 consumer 取走
? Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發依據
RabbitMQ 提供了 6 種工作模式
RabbitMQ 提供了 6 種工作模式:簡單模式、work queues、Publish/Subscribe 發布與訂閱模式、Routing
路由模式、Topics 主題模式、RPC 遠程調用模式(遠程調用,不太算 MQ;暫不作介紹)。
官網對應模式介紹:https://www.rabbitmq.com/getstarted.html
RabbitMQ的安裝
? RabbitMQ 官方地址:http://www.rabbitmq.com/
docker安裝RabbitMQ
#指定版本,該版本包含了web控制頁面 docker pull rabbitmq:management#方式一:默認guest 用戶,密碼也是 guest docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management#方式二:設置用戶名和密碼 docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root -p 15672:15672 -p 5672:5672 rabbitmq:managementRabbitMQ的6種工作模式代碼演示(代碼注釋有詳細解釋)
模式一,HelloWorld 簡單模式
需求:使用簡單模式完成消息傳遞
步驟:
① 創建工程(生成者、消費者)
② 分別添加依賴
③ 編寫生產者發送消息
④ 編寫消費者接收消息
導包
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>編寫消費者發送消息
package com.fs.provider;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/* 提供方發送一個消息去RabbitMQ,等待消費者去消費這個消息執行成功后,登錄http://192.168.93.132:15672 去查看發現有一個叫hello_world隊列*/ public class HelloWorldProvider {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置5大參數(必須設置)connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual Hosts 每個業務用不同的虛擬機,隔離connectionFactory.setUsername("xiaofu"); // 用戶名 默認 guestconnectionFactory.setPassword("xiaofu"); //密碼 默認 guest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channel 使用NIO同步非阻塞的方式通信Channel channel = connection.createChannel();//創建隊列 Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)參數:1. queue:隊列名稱(自己定義,且全局唯一)2. durable:是否持久化,當mq重啟之后,還在.3. exclusive:有下面2個意思* 是否獨占。只能有一個消費者監聽這隊列* 當Connection關閉時,是否刪除隊列4. autoDelete:是否自動刪除。當沒有Consumer 消費端時,自動刪除掉5. arguments:參數。*///如果沒有一個名字叫hello_world的隊列,則會創建該隊列,如果有則不會創建channel.queueDeclare("hello_world",true,false,false,null);//制作發送的消息String message = "HelloWorld~~~RabbitMQ~~~";/*basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)參數:1. exchange:交換機名稱。簡單模式下交換機會使用默認的 "" 就是這個(AMQP default)2. routingKey:路由名稱 簡單模式下默認就是隊列名稱 消費端的的 Routing key 完全一致,才會接收到消息3. props:配置信息4. body:發送消息數據 字節數組*///發送消息到隊列channel.basicPublish("","hello_world",null,message.getBytes());//釋放資源,這里若不釋放,程序不會停止channel.close();connection.close();}}編寫消費者接收消息
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/* 消息消費者當我們消息提供者發生一條消息到RabbitMQ的Queue隊列中,消費者一監聽到我們的Queue隊列中有一條消息未被消費,就會立馬取出來消費這條消息*/ public class HelloWorldConsumer {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//創建隊列 Queue 由于提供方創建了,我們這里就不需要創建了 // channel.queueDeclare("hello_world",true,false,false,null);//接收消息 消費消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*回調方法,當收到消息后,會自動執行該方法1. consumerTag:標識,消息的唯一ID2. envelope:獲取一些信息,交換機,路由key... 消息來自哪里3. properties:配置信息 就是我們發生消息傳遞的參數,我們這個HelloWorld案列傳遞的是null4. body:數據*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);System.out.println("body:"+new String(body));}};//基本模式/*basicConsume(String queue, boolean autoAck, Consumer callback)參數:1. queue:隊列名稱2. autoAck:是否自動確認3. callback:回調對象(就是上面創建的對象,用于消息的處理消費)*/channel.basicConsume("hello_world",true,defaultConsumer);//關閉資源? 不要} }測試結果
先執行提供者
在執行消費者
模式二,work queues
在一個隊列中如果有多個消費者,那么消費者之間對于同一個消息的關系是競爭的關系。
Work Queues 對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。例如:短信服務部署多個,只需要有一個節點成功發送即可。
提供方代碼編寫
package com.fs.provider;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** Work queues 工作隊列模式:* 與入門程序的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。** 比如發送10個消息給隊列.有2個消費端共同消費這個隊列,那么就會平均去消費這個隊列中的消息,不會出現同時* 消費同一個消息.** 在一個隊列中如果有多個消費者,那么消費者之間對于同一個消息的關系是競爭的關系。*/ public class Producer_WorkQueues {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//5. 創建隊列Queuechannel.queueDeclare("work_queues",true,false,false,null);//發送10次消息,來讓2個消費者同時消費一個隊列for (int i = 1; i <= 10; i++) {String body = i+"hello rabbitmq~~~";//6. 發送消息channel.basicPublish("","work_queues",null,body.getBytes());}//7.釋放資源channel.close();connection.close();} }消費方代碼編寫
消費1
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/* Queue隊列存在多個消費者時, 消費者獲取隊列中的消費時采用默認的輪詢方式*/ public class Consumer_WorkQueues1 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//5. 創建隊列Queue//如果沒有一個名字叫hello_world的隊列,則會創建該隊列,如果有則不會創建,我們消費者已經創建了 // channel.queueDeclare("work_queues",true,false,false,null);// 消費回調對象Consumer consumer = new DefaultConsumer(channel){//回調方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));}};//消費,傳遞消費對象channel.basicConsume("work_queues",true,consumer);//關閉資源?不要} }消費2
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_WorkQueues2 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//如果沒有一個名字叫hello_world的隊列,則會創建該隊列,如果有則不會創建 // channel.queueDeclare("work_queues",true,false,false,null);// 消費回調對象Consumer consumer = new DefaultConsumer(channel){//回調方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));}};//消費,傳遞消費對象channel.basicConsume("work_queues",true,consumer);//關閉資源?不要} }測試
測試先運行消費方接受消息,然后運行提供方發送消息,就會發現2個消費方輪詢的消費了提供方發送的消息
模式三,Publish/Subscribe 發布與訂閱模式
在訂閱模型中,多了一個 Exchange 角色,而且過程略有變化:
? P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
? C:消費者,消息的接收者,會一直等待消息到來
? Queue:消息隊列,接收消息、緩存消息
? Exchange:交換機(X)。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
? Fanout:廣播,將消息交給所有綁定到交換機的隊列
? Direct:定向,把消息交給符合指定routing key 的隊列
? Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
Fanout:廣播
編寫提供方代碼
package com.fs.provider;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** Pub/Sub 訂閱模式* Fanout:廣播,將消息交給所有綁定到交換機的隊列*/ public class Producer_PubSub {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();/*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)參數:1. exchange:交換機名稱2. type:交換機類型 點進源碼查看枚舉如下DIRECT("direct"),:定向FANOUT("fanout"),:扇形(廣播),發送消息到每一個與之綁定隊列。TOPIC("topic"),通配符的方式HEADERS("headers");參數匹配3. durable:是否持久化4. autoDelete:自動刪除,該交換機沒有隊列的時候就自動刪除5. internal:內部使用。 一般false6. arguments:參數*///定義交換機名稱String exchangeName = "test_fanout";//5. 創建交換機,指定交換機類型為FANOUT Fanout:廣播,將消息交給所有綁定到交換機的隊列channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);//6. 自定義隊列名稱String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";//創建隊列channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7. 綁定隊列和交換機/*queueBind(String queue, String exchange, String routingKey)參數:1. queue:隊列名稱2. exchange:交換機名稱3. routingKey:路由鍵,路由規則如果交換機的類型為fanout ,routingKey設置為"" 因為為廣播的方式,所以不用設置routingKey路由規則*/channel.queueBind(queue1Name, exchangeName, "");channel.queueBind(queue2Name, exchangeName, "");//自定義的消息String body = "日志信息:張三調用了findAll方法...日志級別:info...";//8. 發送消息channel.basicPublish(exchangeName, "", null, body.getBytes());//9. 釋放資源channel.close();connection.close();} }編寫消費房代碼
消費1
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_PubSub1 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//隊列1 的名稱 與提供方的隊列名一致String queue1Name = "test_fanout_queue1";//String queue2Name = "test_fanout_queue2";// 回調對象 接收消息Consumer consumer = new DefaultConsumer(channel){//回調方法,消費消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//模擬消費隊列1 的任務是將日志打印到控制臺System.out.println("將日志信息打印到控制臺.....");}};//消費,使用隊列1,傳遞回調對象channel.basicConsume(queue1Name,true,consumer);//關閉資源?不要} }消費2
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_PubSub2 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//String queue1Name = "test_fanout_queue1";//隊列2 的名稱 與提供方的隊列名一致String queue2Name = "test_fanout_queue2";// 回調對象 接收消息Consumer consumer = new DefaultConsumer(channel){//回調方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//模擬消費隊列2 的任務是將消息保存到數據庫System.out.println("將日志信息保存數據庫.....");}};//消費,使用隊列2channel.basicConsume(queue2Name,true,consumer);//關閉資源?不要} }測試運行
先執行提供方發送消息,后執行2個消費方消費消息
模式四,Routing路由模式 Direct
Routing 模式要求隊列在綁定交換機時要指定 routing key,消息會轉發到符合 routing key 的隊列。
提供端代碼編寫
package com.fs.provider;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** Pub/Sub 訂閱模式* Direct:定向,把消息交給符合指定routing key 的隊列*/ public class Producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//自定義交換機名稱String exchangeName = "test_direct";//5. 創建交換機,指定交換機類型 Direct:定向,把消息交給符合指定routing key 的隊列channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);//6. 創建隊列String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7. 綁定隊列和交換機/*queueBind(String queue, String exchange, String routingKey)參數:1. queue:隊列名稱2. exchange:交換機名稱3. routingKey:路由鍵,綁定規則 路由規則因為這次指定的交換機為DIRECT類型,所以我們需要指定路由鍵*///隊列1綁定 error 的routingKeychannel.queueBind(queue1Name, exchangeName, "error");//隊列2綁定 info error warning 這三個routingKeychannel.queueBind(queue2Name, exchangeName, "info");channel.queueBind(queue2Name, exchangeName, "error");channel.queueBind(queue2Name, exchangeName, "warning");String body = "日志信息:張三調用了delete方法...出錯誤了。。。日志級別:error...";String body2 = "日志信息:張三調用了findAll方法.日志級別:info...";//8. 發送消息channel.basicPublish(exchangeName, "info", null, body2.getBytes());channel.basicPublish(exchangeName, "error", null, body.getBytes());//9. 釋放資源channel.close();connection.close();} }消費端代碼編寫
消費1
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();String queue1Name = "test_direct_queue1";//String queue2Name = "test_direct_queue2";// 回調對象 接收消息Consumer consumer = new DefaultConsumer(channel){//回調方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));System.out.println("將日志信息打印到控制臺.....");}};//消費,傳遞隊列與回調對象channel.basicConsume(queue1Name,true,consumer);//關閉資源?不要} }消費2
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_Routing2 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";// 回調對象 接收消息Consumer consumer = new DefaultConsumer(channel){//回調方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));//模擬調用了業務層的操作System.out.println("將日志信息存儲到數據庫.....");}};//消費channel.basicConsume(queue2Name,true,consumer);//關閉資源?不要} }測試運行
先執行提供方發送消息,后執行2個消費方消費消息
模式五,Topics 主題模式 通配符模式
? Topic 類型與 Direct 相比,都是可以根據 RoutingKey 把消息路由到不同的隊列。只不過 Topic 類型Exchange 可以讓隊列在綁定 Routing key 的時候使用通配符!
? Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
? 通配符規則:# 匹配一個或多個詞,* 匹配不多不少恰好1個詞,例如:item.# 能夠匹配 item.insert.abc或者 item.insert,item.* 只能匹配 item.insert
提供端代碼編寫
package com.fs.provider;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** Pub/Sub 發布訂閱者模式* <p>* Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列*/ public class Producer_Topics {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//自定義交換機名稱String exchangeName = "test_topic";//5. 創建交換機,指定交換機類型 Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);//6. 創建隊列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7. 綁定隊列和交換機/*queueBind(String queue, String exchange, String routingKey)參數:1. queue:隊列名稱2. exchange:交換機名稱3. routingKey:路由鍵,綁定規則因為是TOPIC 規則 通配符*///= 系統的名稱.日志級別//通配符規則:# 匹配0個或多個詞,* 匹配不多不少恰好1個詞channel.queueBind(queue1Name, exchangeName, "#.error");channel.queueBind(queue1Name, exchangeName, "order.*");channel.queueBind(queue2Name, exchangeName, "*.*");//定義消息String body = "日志信息:張三調用了findAll方法...日志級別:info...";String body2 = "日志信息:張三調用了delete方法.執行錯誤..日志級別:error...";//8. 發送消息//這個消息只有隊列2能收到,因為只滿足 *.*channel.basicPublish(exchangeName, "goods.find", null, body.getBytes());//這個消息隊列1和隊列2 都能收到并消費,因為滿足通配符channel.basicPublish(exchangeName, "goods.error", null, body2.getBytes());//9. 釋放資源channel.close();connection.close();} }消費端代碼編寫
消費1
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_Topic1 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();String queue1Name = "test_topic_queue1";//String queue2Name = "test_topic_queue2";// 回調對象 接收消息Consumer consumer = new DefaultConsumer(channel){//回調方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));System.out.println("將日志信息存入數據庫.......");}};//消費channel.basicConsume(queue1Name,true,consumer);//關閉資源?不要} }消費2
package com.fs.consumer;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_Topic2 {public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠對象ConnectionFactory connectionFactory = new ConnectionFactory();//設置參數connectionFactory.setHost("192.168.93.132");// ip 默認值 localhostconnectionFactory.setPort(5672); //端口 RabbitMQ代碼操作默認端口 5672connectionFactory.setVirtualHost("/fs");// 虛擬機 默認值/ 這個是我自己創建的Virtual HostsconnectionFactory.setUsername("xiaofu"); // 用戶名 默認 questconnectionFactory.setPassword("xiaofu"); //密碼 默認 quest//獲取對應的連接Connection connection = connectionFactory.newConnection();//創建 channelChannel channel = connection.createChannel();//String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";//回調對象Consumer consumer = new DefaultConsumer(channel){//回調方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/* System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));System.out.println("將日志信息打印控制臺.......");}};//消費channel.basicConsume(queue2Name,true,consumer);//關閉資源?不要} }測試運行
工作模式總結
簡單模式 HelloWorld
一個生產者、一個消費者,不需要設置交換機(使用默認的交換機)。
工作隊列模式 Work Queue
一個生產者、多個消費者(競爭關系),不需要設置交換機(使用默認的交換機)。
發布訂閱模式 Publish/subscribe
需要設置類型為 fanout 的交換機,并且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消
息發送到綁定的隊列。
路由模式 Routing
需要設置類型為 direct 的交換機,交換機和隊列進行綁定,并且指定 routing key,當發送消息到交換機
后,交換機會根據 routing key 將消息發送到對應的隊列。
通配符模式 Topic
需要設置類型為 topic 的交換機,交換機和隊列進行綁定,并且指定通配符方式的 routing key,當發送
消息到交換機后,交換機會根據 routing key 將消息發送到對應的隊列。
Spring 整合 RabbitMQ
? 使用 Spring 整合 RabbitMQ 將組件全部使用配置方式實現,簡化編碼
? Spring 提供 RabbitTemplate 簡化發送消息 API
? 使用監聽機制簡化消費者編碼
fs-spring-rabbitMQ-provider 消息提供方項目
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>fs-rabbitMQ-study</artifactId><groupId>com.fs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>fs-spring-rabbitMQ-provider</artifactId><dependencies><!-- https://mvnrepository.com/artifact/org.springframework/spring-context --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.8.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.10.RELEASE</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework/spring-test --><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.2.8.RELEASE</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>rabbitmq.properties
rabbitmq.host=192.168.93.132 rabbitmq.port=5672 rabbitmq.username=xiaofu rabbitmq.password=xiaofu rabbitmq.virtual-host=/fsspring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加載配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定義rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><!--定義管理交換機、隊列--><rabbit:admin connection-factory="connectionFactory"/><!--定義持久化隊列,不存在則自動創建;不綁定到交換機則綁定到默認交換機默認交換機類型為direct,名字為:"",路由鍵為隊列的名稱--><!--id:bean的名稱name:queue的名稱auto-declare:自動創建auto-delete:自動刪除。 最后一個消費者和該隊列斷開連接后,自動刪除隊列exclusive:是否獨占durable:是否持久化--><rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/><!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~廣播;所有隊列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --><!--定義廣播交換機中的持久化隊列,不存在則自動創建--><rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/><!--定義廣播交換機中的持久化隊列,不存在則自動創建--><rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/><!--定義廣播類型交換機;并綁定上述兩個隊列--><rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"><rabbit:bindings><rabbit:binding queue="spring_fanout_queue_1" /><rabbit:binding queue="spring_fanout_queue_2"/></rabbit:bindings></rabbit:fanout-exchange><!--<rabbit:direct-exchange name="aa" ><rabbit:bindings><!–direct 類型的交換機綁定隊列 key :路由key queue:隊列名稱–><rabbit:binding queue="spring_queue" key="xxx"></rabbit:binding></rabbit:bindings></rabbit:direct-exchange>--><!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一個單詞,#匹配多個單詞 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --><!--定義廣播交換機中的持久化隊列,不存在則自動創建--><rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/><!--定義廣播交換機中的持久化隊列,不存在則自動創建--><rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/><!--定義廣播交換機中的持久化隊列,不存在則自動創建--><rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/><rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"><rabbit:bindings><rabbit:binding pattern="fs.*" queue="spring_topic_queue_star"/><rabbit:binding pattern="fs.#" queue="spring_topic_queue_well"/><rabbit:binding pattern="xf.#" queue="spring_topic_queue_well2"/></rabbit:bindings></rabbit:topic-exchange><!--定義rabbitTemplate對象操作可以在代碼中方便發送消息--><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> </beans>ProducerTest 提供消息發送測試類
package com.fs;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest {//1.注入 RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testHelloWorld(){//2.發送消息rabbitTemplate.convertAndSend("spring_queue","hello world spring....");}/*** 發送fanout消息*/@Testpublic void testFanout(){//2.發送消息//因為是Fanout類型的交換機,所以發生的時候要指定交換機名稱,指定""路由規則,發送的消息rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");}/*** 發送topic消息*/@Testpublic void testTopics(){//2.發送消息//因為是Topics類型的交換機,所以發生的時候要指定交換機名稱,指定通配符路由規則,發送的消息rabbitTemplate.convertAndSend("spring_topic_exchange","fs.hehe.haha","spring topic....");} }fs-spring-rabbitMQ-consumer 消息消費方項目
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>fs-rabbitMQ-study</artifactId><groupId>com.fs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>fs-spring-rabbitMQ-consumer</artifactId><dependencies><!-- https://mvnrepository.com/artifact/org.springframework/spring-context --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.8.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.10.RELEASE</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework/spring-test --><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.2.8.RELEASE</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>rabbitmq.properties(與提供方一模一樣)
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加載配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定義rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><!-- 注入我們實現MessageListener接口的類--><bean id="springQueueListener" class="com.fs.rabbitmq.listener.SpringQueueListener"/><!--<bean id="fanoutListener1" class="com.fs.rabbitmq.listener.FanoutListener1"/><bean id="fanoutListener2" class="com.fs.rabbitmq.listener.FanoutListener2"/><bean id="topicListenerStar" class="com.fs.rabbitmq.listener.TopicListenerStar"/><bean id="topicListenerWell" class="com.fs.rabbitmq.listener.TopicListenerWell"/><bean id="topicListenerWell2" class="com.fs.rabbitmq.listener.TopicListenerWell2"/> --><!-- 注入連接工廠,消費隊列中的消息--><rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"><rabbit:listener ref="springQueueListener" queue-names="spring_queue"/><!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/><rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/><rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/><rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/><rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>--></rabbit:listener-container> </beans>SpringQueueListener 消息監聽
package com.fs.rabbitmq.listener;import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener;/* 創建一個類來實現MessageListener 消息監聽的接口 重寫onMessage方法 使用Message參數來消費*/ public class SpringQueueListener implements MessageListener {@Overridepublic void onMessage(Message message) {//打印消息System.out.println(new String(message.getBody()));} }ConsumerTest spring容器啟動測試
package com.fs;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest {@Testpublic void test1(){boolean flag = true;//讓代碼不停止,就一直開啟連接監聽我們的隊列中的消息while (true){}} }測試Spring 整合 RabbitMQ
先執行ConsumerTest的test1方法,讓那個消息監聽器一直監聽某個隊列,有消息就消費
然后執行消息提供方的測試方法發送消息
Springboot 整合RabbitMQ
生產端
org.springframework.boot
spring-boot-starter-amqp
消費端
6. 創建消費者SpringBoot工程
7. 引入start,依賴坐標
org.springframework.boot
spring-boot-starter-amqp
8. 編寫yml配置,基本信息配置
9. 定義監聽類,使用@RabbitListener注解完成隊列監聽。
父pom.xml
<!-- spring boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.2.RELEASE</version><type>pom</type><!-- import 導入父工程的配置--><scope>import</scope></dependency>fs-springboot-rabbitMQ-provider 消息提供端
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>fs-rabbitMQ-study</artifactId><groupId>com.fs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>fs-springboot-rabbitMQ-provider</artifactId><dependencies><!-- spring-boot-starter-web spring-boot-starter-actuator綁定在一塊 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies> </project>application.yml
# 配置RabbitMQ的基本信息 ip 端口 username password 虛擬機.. spring:rabbitmq:host: 192.168.93.132 # ipport: 5672username: xiaofupassword: xiaofuvirtual-host: /springbootProducerApplication 主啟動
package com.fs;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);} }RabbitMQConfig 配置RabbitMQ類
package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* java配置類 創建交換機與隊列,綁定關系 鏈式編程,很方便這些也可以在RabbitMQ的web管理界面去創建*/ @Configuration public class RabbitMQConfig {//定義交換機名稱與隊列名稱public static final String EXCHANGE_NAME = "boot_topic_exchange";public static final String QUEUE_NAME = "boot_queue";//1.交換機Exchange@Bean("bootExchange")public Exchange bootExchange(){//創建一個topic類型的交換機 ExchangeBuilder使用交換機構建對象return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//2.Queue 隊列@Bean("bootQueue")public Queue bootQueue(){//QueueBuilder 使用隊列構建對象//返回一個隊列,指定隊列名稱,也可以不使用參數 ,使用的參數withArgument,可以點進源碼查看 x-message-ttl ttl 設置消息過期時間,5000 就是5秒過期 // return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl",5000).build();//不設置參數return QueueBuilder.durable(QUEUE_NAME).build();}//3. 隊列和交換機綁定關系 Binding/*1. 指定哪個隊列2. 指定哪個交換機3. routing key*/@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){//BindingBuilder 使用綁定構建對象//返回bind綁定那個隊列,to位于那個交換機,with在那個路由規則,noargs不指定參數return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();}}ProducerTest 消息發送測試方法
package com.fs;import com.fs.rabbitmq.config.RabbitMQConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest {//1.注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSend(){//發送消息,因為我們綁定的規則是 boot.# 所以發送"boot.haha" 消費方式能夠接受到的//第一個參數,交換機名稱,二參數 routingKey 三參數 消息rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~");} }fs-springboot-rabbitMQ-consumer 消息消費端
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>fs-rabbitMQ-study</artifactId><groupId>com.fs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>fs-springboot-rabbitMQ-consumer</artifactId><dependencies><!-- spring-boot-starter-web spring-boot-starter-actuator綁定在一塊 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies> </project>application.yml
spring:rabbitmq:host: 192.168.93.132 #主機ipport: 5672 #端口username: xiaofupassword: xiaofuvirtual-host: /springbootpublisher-confirms: truepublisher-returns: truelistener:simple:acknowledge-mode: manualConsumerSpringbootApplication 主啟動
package com.fs;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class ConsumerSpringbootApplication {public static void main(String[] args) {SpringApplication.run(ConsumerSpringbootApplication.class, args);}}RabbimtMQListener 隊列監聽類
@RabbitListener(queues = {監聽的隊列,可以指定多個})
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/* 定義監聽類*/ @Component public class RabbimtMQListener {//@RabbitListener(queues = {監聽的隊列,可以指定多個})//監聽boot_queue 這個隊列,有消息就消費@RabbitListener(queues = "boot_queue")public void ListenerQueue(Message message, Channel channel){//System.out.println(message);//打印消息System.out.println(new String(message.getBody()));}}測試運行
先點擊消費端的主啟動,把消費端啟動起來,就會執行我們定義的監聽器,然后監聽隊列,這個隊列有消息發送就會被消費
然后在點擊我們提供端的測試方法發送消息
總結
以上是生活随笔為你收集整理的RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringCloud微服务架构,Con
- 下一篇: gradle idea java ssm