日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ中7种消息队列和保姆级代码演示!

發布時間:2025/3/11 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ中7种消息队列和保姆级代码演示! 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

blog.csdn.net/qq_32828253/article/details/110450249

七種模式介紹與應用場景

簡單模式(Hello World)

做最簡單的事情,一個生產者對應一個消費者,RabbitMQ相當于一個消息代理,負責將A的消息轉發給B

應用場景: 將發送的電子郵件放到消息隊列,然后郵件服務在隊列中獲取郵件并發送給收件人

工作隊列模式(Work queues)

在多個消費者之間分配任務(競爭的消費者模式),一個生產者對應多個消費者,一般適用于執行資源密集型任務,單個消費者處理不過來,需要多個消費者進行處理

應用場景: 一個訂單的處理需要10s,有多個訂單可以同時放到消息隊列,然后讓多個消費者同時處理,這樣就是并行了,而不是單個消費者的串行情況

訂閱模式(Publish/Subscribe)

一次向許多消費者發送消息,一個生產者發送的消息會被多個消費者獲取,也就是將消息將廣播到所有的消費者中。

應用場景: 更新商品庫存后需要通知多個緩存和多個數據庫,這里的結構應該是:

  • 一個fanout類型交換機扇出兩個個消息隊列,分別為緩存消息隊列、數據庫消息隊列

  • 一個緩存消息隊列對應著多個緩存消費者

  • 一個數據庫消息隊列對應著多個數據庫消費者

路由模式(Routing)

有選擇地(Routing key)接收消息,發送消息到交換機并且要指定路由key ,消費者將隊列綁定到交換機時需要指定路由key,僅消費指定路由key的消息

應用場景: 如在商品庫存中增加了1臺iphone12,iphone12促銷活動消費者指定routing key為iphone12,只有此促銷活動會接收到消息,其它促銷活動不關心也不會消費此routing key的消息

主題模式(Topics)

根據主題(Topics)來接收消息,將路由key和某模式進行匹配,此時隊列需要綁定在一個模式上,#匹配一個詞或多個詞,*只匹配一個詞。

應用場景: 同上,iphone促銷活動可以接收主題為iphone的消息,如iphone12、iphone13等

遠程過程調用(RPC)

如果我們需要在遠程計算機上運行功能并等待結果就可以使用RPC,具體流程可以看圖。應用場景:需要等待接口返回數據,如訂單支付

發布者確認(Publisher Confirms)

與發布者進行可靠的發布確認,發布者確認是RabbitMQ擴展,可以實現可靠的發布。在通道上啟用發布者確認后,RabbitMQ將異步確認發送者發布的消息,這意味著它們已在服務器端處理。

應用場景: 對于消息可靠性要求較高,比如錢包扣款

代碼演示

代碼中沒有對后面兩種模式演示,有興趣可以自己研究

簡單模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?QUEUE_NAME?=?"simple_queue";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?聲明隊列// queue:隊列名// durable:是否持久化// exclusive:是否排外??即只允許該channel訪問該隊列???一般等于true的話用于一個隊列只能有一個消費者來消費的場景// autoDelete:是否自動刪除??消費完刪除// arguments:其他屬性channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//消息內容String?message?=?"simplest?mode?message";channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());System.out.println("[x]Sent?'"?+?message?+?"'");//最后關閉通關和連接channel.close();connection.close();} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver?{private?final?static?String?QUEUE_NAME?=?"simplest_queue";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{//?獲取連接ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} }

工作隊列模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?同一時刻服務器只會發送一條消息給消費者channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?同一時刻服務器只會發送一條消息給消費者channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?聲明隊列channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);for?(int?i?=?0;?i?<?100;?i++)?{String?message?=?"work?mode?message"?+?i;channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'");Thread.sleep(i?*?10);}channel.close();connection.close();} }

發布訂閱模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;public?class?Receive1?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?queueName?=?channel.queueDeclare().getQueue();channel.queueBind(queueName,?EXCHANGE_NAME,?"");System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");//?訂閱消息的回調函數DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+?message?+?"'");};//?消費者,有消息時出發訂閱回調函數channel.basicConsume(queueName,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;public?class?Receive2?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?queueName?=?channel.queueDeclare().getQueue();channel.queueBind(queueName,?EXCHANGE_NAME,?"");System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");//?訂閱消息的回調函數DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received2?'"?+?message?+?"'");};//?消費者,有消息時出發訂閱回調函數channel.basicConsume(queueName,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;public?class?Sender?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?message?=?"publish?subscribe?message";channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes("UTF-8"));System.out.println("?[x]?Sent?'"?+?message?+?"'");channel.close();connection.close();} }

路由模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_routing";private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?指定路由的key,接收key和key2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key");channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key2");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});}} import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_routing2";private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?僅接收key2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key2");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";private?final?static?String?EXCHANGE_TYPE?=?"direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?交換機聲明channel.exchangeDeclare(EXCHANGE_NAME,?EXCHANGE_TYPE);//?只有routingKey相同的才會消費String?message?=?"routing?mode?message";channel.basicPublish(EXCHANGE_NAME,?"key2",?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'"); //????????channel.basicPublish(EXCHANGE_NAME,?"key",?null,?message.getBytes()); //????????System.out.println("[x]?Sent?'"?+?message?+?"'");channel.close();connection.close();} }

主題模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_topic";private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?可以接收key.1channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key.*");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_topic2";private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";private?final?static?String?EXCHANGE_TYPE?=?"topic";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?*號代表單個單詞,可以接收key.1channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"*.*");//?#號代表多個單詞,可以接收key.1.2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"*.#");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";private?final?static?String?EXCHANGE_TYPE?=?"topic";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?EXCHANGE_TYPE);String?message?=?"topics?model?message?with?key.1";channel.basicPublish(EXCHANGE_NAME,?"key.1",?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'");String?message2?=?"topics?model?message?with?key.1.2";channel.basicPublish(EXCHANGE_NAME,?"key.1.2",?null,?message2.getBytes());System.out.println("[x]?Sent?'"?+?message2?+?"'");channel.close();connection.close();} }

四種交換機介紹

  • 直連交換機(Direct exchange): 具有路由功能的交換機,綁定到此交換機的時候需要指定一個routing_key,交換機發送消息的時候需要routing_key,會將消息發送道對應的隊列

  • 扇形交換機(Fanout exchange): 廣播消息到所有隊列,沒有任何處理,速度最快

  • 主題交換機(Topic exchange): 在直連交換機基礎上增加模式匹配,也就是對routing_key進行模式匹配,*代表一個單詞,#代表多個單詞

  • 首部交換機(Headers exchange): 忽略routing_key,使用Headers信息(一個Hash的數據結構)進行匹配,優勢在于可以有更多更靈活的匹配規則

總結

這么多種隊列模式中都有其應用場景,大家可以根據應用場景示例中進行選擇

參考

  • RabbitMQ官方教程

  • 官方教程源碼

總結

以上是生活随笔為你收集整理的RabbitMQ中7种消息队列和保姆级代码演示!的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 色com| 日韩中文一区二区三区 | av免费国产| 中文字幕av亚洲精品一部二部 | 欧美hdse | 涩涩视频软件 | 深夜啪啪| 久久午夜片 | 波多野结衣电车 | 日韩卡一卡二 | 毛片综合| 91看片在线观看 | 久久久ww | 免费成年人视频在线观看 | 久久精品国产亚洲AV无码男同 | 日韩美女在线 | 欧美精品日韩在线观看 | 欧美精品一级在线观看 | 欧美成人免费在线视频 | 国产人成在线 | 亚洲视频在线一区二区 | 52av在线| 99re在线视频观看 | 日本一区免费看 | 丰满的人妻hd高清日本 | 久久久精品影视 | 亚洲精品18p | 亚洲成人伊人 | 日韩不卡av| 欧美 日韩 国产 成人 在线观看 | 三上悠亚ssⅰn939无码播放 | 激情在线视频 | 日韩免费av一区二区 | 日韩成人精品在线观看 | 神马久久久久久久久 | 怡红院男人的天堂 | 美女视频三区 | 国产人妖在线观看 | 国语对白一区 | 中文字幕亚洲欧美日韩 | 色婷婷91 | 爱av在线 | 中文字幕在线观看网址 | 在线观看福利视频 | 午夜av大片 | 精品少妇人妻一区二区黑料社区 | 亚洲精品乱码久久久久久久 | 人妻精品久久久久中文 | 丰满少妇被猛烈进入高清播放 | 亚洲v日韩v综合v精品v | 国产亚洲欧美在线 | 欧美播放 | 欧美大片黄色 | 性欧美一级 | 亚洲天堂av影院 | 91黄瓜视频 | 色吧婷婷 | 国产又大又粗又硬 | 1024欧美| 国内视频一区二区 | 日韩中文一区二区三区 | 国内黄色一级片 | 在线观看网站污 | 久久精品99久久久 | 国产精品一区在线观看 | 国产成人精品av久久 | 久久久无码人妻精品无码 | 激情欧美亚洲 | 午夜伊人网 | 黑人性视频 | 蜜桃网av| 亚州av片| 日本一级三级三级三级 | 欧美性高潮视频 | 久久亚洲日本 | 91久久精品日日躁夜夜躁欧美 | 夜夜欢视频 | 日本美女久久 | 欧美怡红院一区二区三区 | 未满十八岁禁止进入 | 精品无码久久久久国产 | 男人的av| 奇米影视中文字幕 | 亚洲丝袜中文字幕 | 久草青青视频 | 中文字幕在线播放 | 一级高清毛片 | 国产激情视频网站 | 亚洲国产一区二区三区在线观看 | 漂亮少妇高潮午夜精品 | japanese在线观看 | 中文字幕一区二区三区人妻 | 欧美大片a| 成人免费视频国产免费麻豆 | 成年人免费看的视频 | 88av网站| 亚洲13p| 亚洲精品中文字幕 | 97久久人国产精品婷婷 |