日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rabbitmq进阶一

發布時間:2025/3/8 编程问答 14 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq进阶一 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上一篇文章有講到rabbitmq的安裝、web管理端和springboot簡單集成rabbitmq

本文重點介紹rabbitmq相關api的使用

按照官網常用的五種模式的順序:HelloWorld、Work queues、Publish/Subscribe、Routing、Topics

模式簡單介紹

HelloWorld

一個生產者,一個隊列,一個消費者。

一個demo,實際很少使用。

Work queues

在多個消費者之間分配任務,競爭消費模式。

Publish/Subscribe

發布訂閱模式,同時向多個消費者發送消息。

Routing

選擇性的接收消息

Topics

基于表達式接收消息

模式具體使用(rabbitmqclient)

HelloWorld

創建maven項目并且引入依賴

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies>

創建工具類,用于處理連接和信道的創建,以及他們的關閉

package org.cc;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class ConnectionUtils {public static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory(); // connectionFactory.setHost("localhost");//默認主機:localhost // connectionFactory.setPort(5672);//默認端口5672 // connectionFactory.setUsername("guest");//默認用戶名:guest // connectionFactory.setPassword("guest");//默認密碼:guest // connectionFactory.setVirtualHost("/");//默認虛擬主機:/return connectionFactory.newConnection();}public static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {channel.close();connection.close();} }

創建消費者

package org.cc;import com.rabbitmq.client.*; import org.junit.Test;import java.io.IOException; import java.util.concurrent.TimeoutException;public class HelloWorldConsumer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();/*聲明一個名稱是my helloworld queue,持久化,非獨享,非自動刪除的隊列durable – 是否持久化,為true時重啟rabbitmq服務,會保留原有的隊列exclusive – 是否獨享,為true時連接一旦斷開,會自動刪除隊列autoDelete 是否自動刪除,為true時一旦隊列被消費,會自動刪除隊列*///若隊列已存在,這些參數必須與隊列一致,若隊列不存在則創建channel.queueDeclare("my helloworld queue",true,false,false,null);channel.basicConsume("my helloworld queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("helloworld consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列} }

創建生產者

package org.cc;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class HelloWorldProducer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();/*聲明一個名稱是my helloworld queue,持久化,非獨享,非自動刪除的隊列durable – 是否持久化,為true時重啟rabbitmq服務,會保留原有的隊列exclusive – 是否獨享,為true時連接一旦斷開,會自動刪除隊列autoDelete 是否自動刪除,為true時一旦隊列被消費,會自動刪除隊列*///若隊列已存在,這些參數必須與隊列一致,若隊列不存在則創建channel.queueDeclare("my helloworld queue",true,false,false,null);channel.basicPublish("","my helloworld queue",null,"helloworld消息內容".getBytes(StandardCharsets.UTF_8));ConnectionUtils.closeConnection(connection,channel);} }

?若要保證rabbitmq重啟后消息仍然存在,生產者發送消息時需要設置props參數

channel.basicPublish("","my helloworld queue", MessageProperties.PERSISTENT_TEXT_PLAIN,"helloworld消息內容".getBytes(StandardCharsets.UTF_8));

?開啟手動ack,消費者接收到消息時,需要手動發送ack確認后消息才會真正從隊列中刪除

channel.basicConsume("my helloworld queue",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("helloworld consumer接收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}});

Work queues

創建消費者,與上面helloworld模式代碼基本一致,將原有的創建消費者的代碼重復一遍

package org.cc;import com.rabbitmq.client.*; import org.junit.Test;import java.io.IOException; import java.util.concurrent.TimeoutException;public class HelloWorldConsumer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();/*聲明一個名稱是my helloworld queue,持久化,非獨享,非自動刪除的隊列durable – 是否持久化,為true時重啟rabbitmq服務,會保留原有的隊列exclusive – 是否獨享,為true時連接一旦斷開,會自動刪除隊列autoDelete 是否自動刪除,為true時一旦隊列被消費,會自動刪除隊列*///若隊列已存在,這些參數必須與隊列一致,若隊列不存在則創建channel.queueDeclare("my helloworld queue",true,false,false,null);channel.basicConsume("my helloworld queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("helloworld consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列} }

創建生產者,與上面helloworld模式代碼基本一致,這里連續發送10條消息

package org.cc;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class WorkProducer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.queueDeclare("my work queue",true,false,false,null);for (int i = 0; i < 10; i++) {channel.basicPublish("","my work queue", MessageProperties.PERSISTENT_TEXT_PLAIN,("work消息內容"+i).getBytes(StandardCharsets.UTF_8));}ConnectionUtils.closeConnection(connection,channel);} }

?從消費者的控制臺可以看到兩個消費者輪流接收到消息

Publish/Subscribe

消費者

package org.cc;import com.rabbitmq.client.*; import org.junit.Test;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Subscriber {@Testpublic void receive() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout exchange", BuiltinExchangeType.FANOUT);channel.queueDeclare("my fanout queue1",true,false,false,null);channel.queueBind("my fanout queue1","fanout exchange","");channel.basicConsume("my fanout queue1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my fanout queue1 consumer接收到消息:"+new String(body));}});Connection connection1 = ConnectionUtils.createConnection();Channel channel1 = connection1.createChannel();channel1.queueDeclare("my fanout queue2",true,false,false,null);channel.queueBind("my fanout queue2","fanout exchange","");channel1.basicConsume("my fanout queue2",true,new DefaultConsumer(channel1){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my fanout queue2 consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列} }

生產者

package org.cc;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class Publisher {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout exchange", BuiltinExchangeType.FANOUT);channel.basicPublish("fanout exchange","", MessageProperties.PERSISTENT_TEXT_PLAIN,"fanout exchange消息內容".getBytes(StandardCharsets.UTF_8));ConnectionUtils.closeConnection(connection,channel);} }

?隊列需要同交換機綁定,生產者向交換機發送消息

Routing

消費者?

package org.cc;import com.rabbitmq.client.*; import org.junit.Test;import java.io.IOException; import java.util.concurrent.TimeoutException;public class RoutingKeyConsumer {@Testpublic void receive() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("direct exchange", BuiltinExchangeType.DIRECT);channel.queueDeclare("my direct queue1",true,false,false,null);channel.queueBind("my direct queue1","direct exchange","info");channel.basicConsume("my direct queue1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my direct queue1 consumer接收到消息:"+new String(body));}});Connection connection1 = ConnectionUtils.createConnection();Channel channel1 = connection1.createChannel();channel1.queueDeclare("my direct queue2",true,false,false,null);channel.queueBind("my direct queue2","direct exchange","info");channel.queueBind("my direct queue2","direct exchange","error");channel1.basicConsume("my direct queue2",true,new DefaultConsumer(channel1){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my direct queue2 consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列} }

生產者

package org.cc;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class RoutingProducer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("direct exchange", BuiltinExchangeType.DIRECT);channel.basicPublish("direct exchange","info", MessageProperties.PERSISTENT_TEXT_PLAIN,"direct exchange info消息內容".getBytes(StandardCharsets.UTF_8));channel.basicPublish("direct exchange","error", MessageProperties.PERSISTENT_TEXT_PLAIN,"direct exchange error消息內容".getBytes(StandardCharsets.UTF_8));ConnectionUtils.closeConnection(connection,channel);} }

Topics

交換機路由消息給隊列時基于表達式,*匹配1個,#配置0個或1個或多個

例如:當隊列1的路由值設置user.*,隊列2的路由值設置user.#時,向交換機分別發送四條消息,消息的路由值分別為user.insert、user.insert.a、user.、user

此時隊列1會收到路由值為user.insert和user.的消息,隊列1能收到上面全部四條消息

消費者代碼

package org.cc;import com.rabbitmq.client.*; import org.junit.Test;import java.io.IOException; import java.util.concurrent.TimeoutException;public class TopicsConsumer {@Testpublic void receive() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("topic exchange", BuiltinExchangeType.TOPIC);channel.queueDeclare("my topic queue1",true,false,false,null);channel.queueBind("my topic queue1","topic exchange","user.*");channel.basicConsume("my topic queue1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my topic queue1 consumer接收到消息:"+new String(body));}});Connection connection1 = ConnectionUtils.createConnection();Channel channel1 = connection1.createChannel();channel1.queueDeclare("my topic queue2",true,false,false,null);channel.queueBind("my topic queue2","topic exchange","user.#");channel1.basicConsume("my topic queue2",true,new DefaultConsumer(channel1){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("my topic queue2 consumer接收到消息:"+new String(body));}});System.in.read();//保持消費者一直監聽隊列} }

生產者代碼

package org.cc;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class TopicsProducer {@Testpublic void sendMsg() throws IOException, TimeoutException {Connection connection = ConnectionUtils.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("topic exchange", BuiltinExchangeType.TOPIC);channel.basicPublish("topic exchange","user.insert", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.insert消息內容".getBytes(StandardCharsets.UTF_8));channel.basicPublish("topic exchange","user.insert.a", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.insert.a消息內容".getBytes(StandardCharsets.UTF_8));channel.basicPublish("topic exchange","user.", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user.消息內容".getBytes(StandardCharsets.UTF_8));channel.basicPublish("topic exchange","user", MessageProperties.PERSISTENT_TEXT_PLAIN,"topic exchange user消息內容".getBytes(StandardCharsets.UTF_8));ConnectionUtils.closeConnection(connection,channel);} }

模式具體使用(springboot集成rabbitmq)

使用idea構建項目,選擇spring initializer,創建生產者項目springboot-rabbitmq-producer

??dependencies選擇如下

?application.properties設置如下

?使用同樣的方式創建消費者項目springboot-rabbitmq-consumer,將server.port設置為8081

當前springboot版本最新為2.6.3

HelloWorld

創建消費者并啟動消費者應用

package com.example.springbootrabbitmqconsumer.helloworld;import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** springboot-rabbitmq-producer** @author v_choncheng* @description* @create 2022-02-15 14:42*/ @Component @RabbitListener(queuesToDeclare = @Queue("helloworld")) public class HelloWorldConsumer {@RabbitHandlerpublic void receive(String msg) {System.out.println("消費者接受到消息" + msg);}}

創建生產者

package com.example.springbootrabbitmqproducer.helloworld;import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** springboot-rabbitmq-producer** @author v_choncheng* @description* @create 2022-02-15 14:42*/ @Configuration public class HelloWorldProducer {@Beanpublic Queue createQueue() {return new Queue("helloworld");} }

生產者工程測試類中增加測試方法

?運行此測試方法后可以看到消費者接收到一條消息


Work queues

創建消費者并啟動消費者應用

package com.example.springbootrabbitmqconsumer.workqueues;import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** springboot-rabbitmq-consumer** @author v_choncheng* @description* @create 2022-02-15 15:11*/ @Component public class WorkQueuesConsumer {@RabbitListener(queuesToDeclare = @Queue("workqueues"))public void receive1(String msg) {System.out.println("消費者1接受到消息" + msg);}@RabbitListener(queuesToDeclare = @Queue("workqueues"))public void receive2(String msg) {System.out.println("消費者2接受到消息" + msg);} }

生產者工程測試類中增加測試方法

運行此測試方法后可以看到消費者1、2輪流接收到消息


Publish/Subscribe

創建消費者并啟動消費者應用

package com.example.springbootrabbitmqconsumer.fanout;import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** springboot-rabbitmq-consumer** @author v_choncheng* @description* @create 2022-02-15 15:23*/ @Component public class FanoutConsumer {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.FANOUT, name = "fanoutexchange"), value = @Queue("fanoutqueues1")))public void receive1(String msg) {System.out.println("消費者1接受到消息" + msg);}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.FANOUT, name = "fanoutexchange"), value = @Queue("fanoutqueues2")))public void receive2(String msg) {System.out.println("消費者2接受到消息" + msg);} }

生產者工程測試類中增加測試方法

?運行此測試方法后可以看到消費者1、2同時接收到消息


Routing

創建消費者并啟動消費者應用

package com.example.springbootrabbitmqconsumer.routing;import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** springboot-rabbitmq-consumer** @author v_choncheng* @description* @create 2022-02-15 15:38*/ @Component public class RoutingConsumer {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.DIRECT, name = "routingexchange"), value = @Queue("routingqueues1"), key = {"debug", "verbose", "notice", "warning"}))public void receive1(String msg) {System.out.println("消費者1接受到消息" + msg);}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.DIRECT, name = "routingexchange"), value = @Queue("routingqueues2"), key = {"debug", "verbose"}))public void receive2(String msg) {System.out.println("消費者2接受到消息" + msg);} }

生產者工程測試類中增加測試方法

?運行此測試方法后可以看到消費者1接收四條消息、消費者2只接收到debug和verbose消息


Topics

創建消費者并啟動消費者應用

package com.example.springbootrabbitmqconsumer.topics;import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** springboot-rabbitmq-consumer** @author v_choncheng* @description* @create 2022-02-15 15:38*/ @Component public class TopicsConsumer {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.TOPIC, name = "topicexchange"), value = @Queue("topicqueues1"), key = {"user.*"}))public void receive1(String msg) {System.out.println("消費者1接受到消息" + msg);}@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(type = ExchangeTypes.TOPIC, name = "topicexchange"), value = @Queue("topicqueues2"), key = {"user.#", "verbose"}))public void receive2(String msg) {System.out.println("消費者2接受到消息" + msg);} }

生產者工程測試類中增加測試方法

?運行此測試方法后可以看到消費者2接收四條消息、消費者1只接收到user.和user.insert消息

總結

以上是生活随笔為你收集整理的rabbitmq进阶一的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。