日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 消息队列六种模式

發布時間:2024/9/30 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 消息队列六种模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RabbitMQ 的第一個程序

RabbitMQ-生產者|消費者

搭建環境

java client

生產者和消費者都屬于客戶端, rabbitMQ的java客戶端如下

創建 maven 工程

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version> </dependency>

AMQP協議的回顧

RabbitMQ支持的消息模型

第一種模型(直連)

在上圖的模型中,有以下概念:

  • P:生產者,也就是要發送消息的程序
  • C:消費者:消息的接受者,會一直等待消息到來。
  • queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。

開發生產者

/*** 生產者* <p>* 直連模式** @author mxz*/ @Component public class Provider {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 獲取連接中通道Channel channel = connection.createChannel();// 通道綁定消息隊列// 參數1 隊列的名稱, 如果不存在則自動創建// 參數2 用來定義隊列是否需要持久化, true 持久化隊列(mq關閉時, 會存到磁盤中) false 不持久化(關閉即失)// 參數3 exclusive 是否獨占隊列 true 獨占隊列 false 不獨占// 參數4 autoDelete 是否在消費后自動刪除隊列 true 自動刪除 false 不刪除// 參數5 額外的附加參數channel.queueDeclare("hello", false, false, false, null);// 發布消息// 參數1 交換機名稱// 參數2 隊列名稱// 參數3 傳遞消息額外設置// 參數4 消息的具體內容channel.basicPublish("", "hello", null, "hello rabbitMQ".getBytes());RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }

開發消費者

/*** 消費者** @author mxz*/ @Component public class Customer {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創建通道Channel channel = connection.createChannel();// 通道綁定對象channel.queueDeclare("hello", false, false, false, null);// 消費消息// 參數1 消息隊列的消息, 隊列名稱// 參數2 開啟消息的確認機制// 參數3 消息時的回調接口channel.basicConsume("hello", true, new DefaultConsumer(channel) {// 最后一個參數 消息隊列中取出的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("new String(body)" + new String(body));}});// channel.close(); // connection.close();}}

工具類

/*** @author mxz*/ public class RabbitMQUtils {private static ConnectionFactory connectionFactory;// 重量級資源 類加載執行一次(即可)static {// 創建連接 mq 的連接工廠connectionFactory = new ConnectionFactory();// 設置 rabbitmq 主機connectionFactory.setHost("127.0.0.1");// 設置端口號connectionFactory.setPort(5672);// 設置連接哪個虛擬主機connectionFactory.setVirtualHost("/codingce");// 設置訪問虛擬主機用戶名密碼connectionFactory.setUsername("codingce");connectionFactory.setPassword("123456");}/*** 定義提供連接對象的方法** @return*/public static Connection getConnection() {try {return connectionFactory.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}/*** 關閉通道和關閉連接工具方法** @param connection* @param channel*/public static void closeConnectionAndChannel(Channel channel, Connection connection) {try {// 先關 channelif (channel != null)channel.close();if (connection != null)connection.close();} catch (Exception e) {e.printStackTrace();}} }

第二種模型(work quene)

Work queues,也被稱為(Task queues),任務模型。當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用work 模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,因此任務是不會被重復執行的。

角色:

  • P:生產者:任務的發布者
  • C1:消費者-1,領取任務并且完成任務,假設完成速度較慢
  • C2:消費者-2:領取任務并完成任務,假設完成速度快

開發生產者

/*** 生產者* <p>* 任務模型 work quenue** @author mxz*/ @Component public class Provider {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 通過通道聲明隊列channel.queueDeclare("work", true, false, false, null);for (int i = 0; i < 10; i++) {// 生產消息channel.basicPublish("", "work", null, (" " + i + "work quenue").getBytes());}// 關閉資源RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }

開發消費者-1

/*** 自動確認消費 autoAck true 12搭配測試* <p>* 消費者 1** @author mxz*/ @Component public class CustomerOne {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創建通道Channel channel = connection.createChannel();// 通道綁定對象channel.queueDeclare("work", true, false, false, null);// 消費消息// 參數1 消息隊列的消息, 隊列名稱// 參數2 開啟消息的確認機制// 參數3 消息時的回調接口channel.basicConsume("work", true, new DefaultConsumer(channel) {// 最后一個參數 消息隊列中取出的消息// 默認分配是平均的@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者-1" + new String(body));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});// channel.close(); // connection.close();}}

開發消費者-2

/*** 自動確認消費 autoAck true 12搭配測試* <p>* 消費者 2** @author mxz*/ @Component public class CustomerTwo {public static void main(String[] args) throws IOException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創建通道Channel channel = connection.createChannel();// 通道綁定對象channel.queueDeclare("work", true, false, false, null);channel.basicConsume("work", true, new DefaultConsumer(channel) {// 最后一個參數 消息隊列中取出的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者-1" + new String(body));}});// channel.close(); // connection.close();}}

測試結果

總結:默認情況下,RabbitMQ將按順序將每個消息發送給下一個使用者。平均而言,每個消費者都會收到相同數量的消息。這種分發消息的方式稱為循環。

消息自動確認機制

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

消費者3

/*** 能者多勞 34 搭配測試* <p>* 消費者 3** @author mxz*/ @Component public class CustomerThree {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創建通道Channel channel = connection.createChannel();// 每一次只能消費一個消息channel.basicQos(1);// 通道綁定對象channel.queueDeclare("work", true, false, false, null);// 參數1 隊列名稱 參數2(autoAck) 消息自動確認 true 消費者自動向 rabbitMQ 確認消息消費 false 不會自動確認消息// 若出現消費者宕機情況 消費者三可以進行消費channel.basicConsume("work", false, new DefaultConsumer(channel) {// 最后一個參數 消息隊列中取出的消息// 默認分配是平均的@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者-1" + new String(body));// 手動確認 參數1 確認隊列中channel.basicAck(envelope.getDeliveryTag(), false);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});// channel.close(); // connection.close();}}

消費者4

/*** 能者多勞 34 搭配測試* <p>* 消費者 4** @author mxz*/ @Component public class CustomerFour {public static void main(String[] args) throws IOException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創建通道Channel channel = connection.createChannel();// 每一次只能消費一個消息channel.basicQos(1);// 通道綁定對象channel.queueDeclare("work", true, false, false, null);channel.basicConsume("work", false, new DefaultConsumer(channel) {// 最后一個參數 消息隊列中取出的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者-1" + new String(body));// 手動確認 參數1 手動確認channel.basicAck(envelope.getDeliveryTag(), false);}});// channel.close(); // connection.close();}}

消費者3

/*** 能者多勞 34 搭配測試* <p>* 消費者 3** @author mxz*/ @Component public class CustomerThree {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創建通道Channel channel = connection.createChannel();// 每一次只能消費一個消息channel.basicQos(1);// 通道綁定對象channel.queueDeclare("work", true, false, false, null);// 參數1 隊列名稱 參數2(autoAck) 消息自動確認 true 消費者自動向 rabbitMQ 確認消息消費 false 不會自動確認消息// 若出現消費者宕機情況 消費者三可以進行消費channel.basicConsume("work", false, new DefaultConsumer(channel) {// 最后一個參數 消息隊列中取出的消息// 默認分配是平均的@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者-1" + new String(body));// 手動確認 參數1 確認隊列中channel.basicAck(envelope.getDeliveryTag(), false);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});// channel.close(); // connection.close();}}

消費者4

/*** 能者多勞 34 搭配測試* <p>* 消費者 4** @author mxz*/ @Component public class CustomerFour {public static void main(String[] args) throws IOException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創建通道Channel channel = connection.createChannel();// 每一次只能消費一個消息channel.basicQos(1);// 通道綁定對象channel.queueDeclare("work", true, false, false, null);channel.basicConsume("work", false, new DefaultConsumer(channel) {// 最后一個參數 消息隊列中取出的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者-1" + new String(body));// 手動確認 參數1 手動確認channel.basicAck(envelope.getDeliveryTag(), false);}});// channel.close(); // connection.close();}}

第三種模型(fanout)

fanout 扇出 也稱為廣播

在廣播模式下,消息發送流程是這樣的:

  • 可以有多個消費者
  • 每個消費者有自己的queue(隊列)
  • 每個隊列都要綁定到Exchange(交換機)
  • 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定。
  • 交換機把消息發送給綁定過的所有隊列
  • 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費

開發開發生產者

/*** 生產者* <p>* 任務模型 fanout** @author mxz*/ @Component public class Provider {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 將通道聲明指定交換機 參數1 交換機名稱 參數2 代表交換機類型 fanout 廣播類型channel.exchangeDeclare("logs", "fanout");// 發送消息channel.basicPublish("logs", "", null, "fanout type message".getBytes());// 關閉資源RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }

開發消費者

  • 消費者 1
/*** 消費者 1* <p>* 任務模型 fanout** @author mxz*/ public class CustomerOne {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 通道綁定交換機channel.exchangeDeclare("logs", "fanout");// 臨時隊列String queue = channel.queueDeclare().getQueue();// 綁定交換機隊列channel.queueBind(queue, "logs", "");// 消費消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1 " + new String(body));}});}}
  • 消費者 2
/*** 消費者 2* <p>* 任務模型 fanout** @author mxz*/ public class CustomerTwo {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 通道綁定交換機channel.exchangeDeclare("logs", "fanout");// 臨時隊列String queue = channel.queueDeclare().getQueue();// 綁定交換機隊列channel.queueBind(queue, "logs", "");// 消費消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2 " + new String(body));}});}}
  • 消費者 3
/*** 消費者 3* <p>* 任務模型 fanout** @author mxz*/ public class CustomerThree {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 通道綁定交換機channel.exchangeDeclare("logs", "fanout");// 臨時隊列String queue = channel.queueDeclare().getQueue();// 綁定交換機隊列channel.queueBind(queue, "logs", "");// 消費消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者3 " + new String(body));}});}}

測試結果

第四種模型(Routing)

Routing 之訂閱模型-Direct(直連)

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

流程:

圖解:

  • P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
  • X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
  • C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
  • C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

開發生產者

/*** @author mxz*/ public class Provider {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 通過通道聲明交換機 參數1 交換機名稱 參數2 路由模式channel.exchangeDeclare("logs_direct", "direct");// 發送消息String routingKey = "error";channel.basicPublish("logs_direct", routingKey, null, ("這是 direct 模式發布基于 route_key [" + routingKey + "]").getBytes());// 關閉資源RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }

開發消費者

  • 消費者1
/*** 消費者 1** @author mxz*/ @Component public class CustomerOne {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創建通道Channel channel = connection.createChannel();// 創建一個臨時隊列String queue = channel.queueDeclare().getQueue();// 基于 route_key 綁定隊列交換機channel.queueBind(queue, "logs_direct", "error");// 消費消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1: " + new String(body));}});// channel.close(); // connection.close();}}
  • 消費者2
/*** 消費者 2** @author mxz*/ @Component public class CustomerTwo {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 聲明交換機channel.exchangeDeclare("logs_direct", "direct");// 創建一個臨時隊列String queue = channel.queueDeclare().getQueue();// 臨時隊列和綁定交換機channel.queueBind(queue, "logs_direct", "info");channel.queueBind(queue, "logs_direct", "error");channel.queueBind(queue, "logs_direct", "warning");// 消費消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2:" + new String(body));}});}}

Routing 之訂閱模型-Topic

Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!這種模型Routingkey 一般都是由一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

# 統配符* (star) can substitute for exactly one word. 匹配不多不少恰好1個詞# (hash) can substitute for zero or more words. 匹配一個或多個詞 # 如:audit.# 匹配audit.irs.corporate或者 audit.irs 等audit.* 只能匹配 audit.irs

開發生產者

/*** 生產者* <p>** @author mxz*/ @Component public class Provider {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 聲明交換機以及交換機類型channel.exchangeDeclare("topics", "topic");// 路由keyString routeKey = "user.save";channel.basicPublish("topics", routeKey, null, ("這里是 topic 動態路由模型, routeKey:[" + routeKey + "]").getBytes());// 關閉資源RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }

開發消費者

  • 消費者
/*** @author mxz*/ public class CustomerOne {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 聲明交換機以及交換機類型channel.exchangeDeclare("topics", "topic");// 創建一個臨時隊列String queue = channel.queueDeclare().getQueue();// 綁定隊列和交換機 動態通配符 route keychannel.queueBind(queue, "topics", "user.*");// 消費消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1:" + new String(body));}});}}
  • 消費者
/*** @author mxz*/ public class CustomerTwo {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 聲明交換機以及交換機類型channel.exchangeDeclare("topics", "topic");// 創建一個臨時隊列String queue = channel.queueDeclare().getQueue();// 綁定隊列和交換機 動態通配符 route keychannel.queueBind(queue, "topics", "user.#");// 消費消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2:" + new String(body));}});}}

文章已上傳gitee https://gitee.com/codingce/hexo-blog
項目地址: https://github.com/xzMhehe/codingce-java

文章已上傳gitee https://gitee.com/codingce/hexo-blog
項目地址github: https://github.com/xzMhehe/codingce-java

總結

以上是生活随笔為你收集整理的RabbitMQ 消息队列六种模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。