日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ简介以及应用

發布時間:2025/3/19 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ简介以及应用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、簡要介紹

  • 開源AMQP實現,Erlang語言編寫,支持多種客戶端

  • 分布式、高可用、持久化、可靠、安全

  • 支持多種協議:AMQP、STOMP、MQTT、HTTP

  • 適用于多系統之間的業務解耦的消息中間件

二、基本概念

1、exchange:交換器,負責接收消息,轉發消息至綁定的隊列,有四種類型:

  • direct:完全匹配的路由

  • topic:模式匹配的路由

  • fanout:廣播模式

  • headers:鍵值對匹配路由

Exchange屬性:

  • 持久化:如果啟用,那么rabbit服務重啟之后仍然存在

  • 自動刪除:如果啟用,那么交換器將會在其綁定的隊列都被刪除掉之后自動刪除掉自身

2、Queue:隊列,rabbitmq的內部對象,用于存儲消息,其屬性類似于Exchange,同樣可以設置是否持久化、自動刪除等。

消費者從Queue中獲取消息并消費。多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。

3、Binding:綁定,根據路由規則綁定交換器與隊列

4、Routing:路由鍵,路由的關鍵字

三、消息的可靠性

  • Message acknowledgment:消息確認,在消息確認機制下,收到回執才會刪除消息,未收到回執而斷開了連接,消息會轉發給其他消費者,如果忘記回執,會導致消息堆積,消費者重啟后會重復消費這些消息并重復執行業務邏輯。

  • Message durability:消息持久化,設置消息持久化可以避免絕大部分消息丟失,比如rabbitmq服務重啟,但是采用非持久化可以提升隊列的處理效率。如果要確保消息的持久化,那么消息對應的Exchange和Queue同樣要設置為持久化。

  • Prefetch count,每次發送給消費者消息的數量,默認為1

另外,如果需要可靠性業務,需要設置持久化和ack機制,如果系統高吞吐,可以設置為非持久化、noack、自動刪除機制。

四、簡單應用

模擬這樣一個業務場景,用戶下單成功后,需要給用戶增加積分,同時還需要給用戶發送下單成功的消息,這是在電商業務中很常見的一個業務場景。

如果系統是微服務架構,可能用戶下單功能在訂單服務,給用戶增加積分的功能在積分服務,給用戶發送通知消息的功能在通知服務,各個服務之間解耦,互不影響。那么要實現上述的業務場景,消息中間件rabbitmq是一個很好的選擇。

原因如下:

  • 高性能,它的實現語言是天生具備高并發高可用的erlang 語言

  • 支持消息的持久化,即使服務器掛了,也不會丟失消息

  • 消息應答(ack)機制,消費者消費完消息后發送一個消息應答,rabbitmq才會刪除消息,確保消息的可靠性

  • 支持高可用集群

  • 靈活的路由

實現思路:

用戶下單成功后,rabbitmq發送一條消息至EXCHANGE.ORDER_CREATE交換器,該交換器綁定了兩個隊列,QUEUE.ORDER_INCREASESCORE、QUEUE.ORDER_NOTIFY,消費者訂閱這兩個隊列分別用來處理增加積分、發送用戶通知。如果后續日志系統還需要記錄下單的相關日志,那么我們只需要再定義一個隊列并將其綁定到EXCHANGE.ORDER_CREATE即可。

下單發rabbitmq消息

package?com.robot.rabbitmq;import?com.rabbitmq.client.AMQP; import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.UUID; import?java.util.concurrent.TimeoutException;/***?@author:?會跳舞的機器人*?@date:?2017/10/13?10:46*?@description:?模擬用戶下單之后發送rabbitmq消息*/ public?class?OrderCreator?{//?交換器名稱private?static?final?String?EXCHANGE?=?"EXCHANGE.ORDER_CREATE";//?消息內容private?static?String?msg?=?"create?order?success";/***?模擬創建訂單后發送mq消息*/public?void?createOrder()?{System.out.println("下單成功,開始發送rabbitmq消息");ConnectionFactory?connectionFactory?=?new?ConnectionFactory();connectionFactory.setHost("192.168.12.44");connectionFactory.setPort(56720);connectionFactory.setUsername("baibei");connectionFactory.setPassword("baibei");Connection?connection;Channel?channel;try?{connection?=?connectionFactory.newConnection();channel?=?connection.createChannel();//?持久化boolean?durable?=?true;//?topic類型String?type?=?"topic";//?聲明交換器,如果交換器不存在則創建之channel.exchangeDeclare(EXCHANGE,?type,?durable);String?messgeId?=?UUID.randomUUID().toString();//?deliveryMode>=2表示設置消息持久化AMQP.BasicProperties?props?=?new?AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();//?發布消息String?routingKey?=?"order_create";channel.basicPublish(EXCHANGE,?routingKey,?props,?msg.getBytes("utf-8"));connection.close();}?catch?(IOException?e)?{e.printStackTrace();}?catch?(TimeoutException?e)?{e.printStackTrace();}} }

積分系統訂閱消息

package?com.robot.rabbitmq;import?com.rabbitmq.client.AMQP; import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.Consumer; import?com.rabbitmq.client.Envelope; import?com.rabbitmq.client.ShutdownSignalException;import?java.io.IOException; import?java.util.concurrent.TimeoutException;/***?@author:?會跳舞的機器人*?@date:?2017/10/13?16:02*?@description:?rabbitmq消費者,模擬下單成功后給用戶增加積分*/ public?class?IncreaseScoreConsumer?implements?Consumer?{private?Connection?connection;private?Channel?channel;//?交換器名稱private?static?final?String?EXCHANGE?=?"EXCHANGE.ORDER_CREATE";//?增加積分隊列名稱private?static?final?String?QUEUENAME?=?"QUEUE.ORDER_INCREASESCORE";public?void?consume()?{//?初始化rabbitmq連接信息ConnectionFactory?connectionFactory?=?new?ConnectionFactory();connectionFactory.setHost("192.168.12.44");connectionFactory.setPort(56720);connectionFactory.setUsername("baibei");connectionFactory.setPassword("baibei");try?{connection?=?connectionFactory.newConnection();channel?=?connection.createChannel();//?聲明交換器channel.exchangeDeclare(EXCHANGE,?"topic",?true);//?聲明隊列channel.queueDeclare(QUEUENAME,?true,?false,?false,?null);//?交換器與隊列綁定并設置routingKeychannel.queueBind(QUEUENAME,?EXCHANGE,?"order_create");//?消費消息,callback是該類,關閉自動確認消息,在完成業務邏輯后手動確認確認channel.basicConsume(QUEUENAME,?false,?this);}?catch?(IOException?e)?{e.printStackTrace();}?catch?(TimeoutException?e)?{e.printStackTrace();}}public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{String?msg?=?new?String(body,?"UTF-8");System.out.println("《積分系統》收到訂單消息:"?+?msg?+?",給用戶增加積分......");//?手動確認消息channel.basicAck(envelope.getDeliveryTag(),?false);/***?channel.basicReject(envelope.getDeliveryTag(),?false);該方法會丟棄掉隊列中的這條消息*?channel.basicReject(envelope.getDeliveryTag(),?true);該方法會把消息重新放回隊列*?一般系統會設定一個重試次數,如果超過重試次數,則會丟棄消息,反之則會把消息再放入隊列*/}public?void?handleConsumeOk(String?consumerTag)?{}public?void?handleCancelOk(String?consumerTag)?{}public?void?handleCancel(String?consumerTag)?throws?IOException?{}public?void?handleShutdownSignal(String?consumerTag,?ShutdownSignalException?sig)?{}public?void?handleRecoverOk(String?consumerTag)?{}}

通知系統訂閱消息

package?com.robot.rabbitmq;import?com.rabbitmq.client.AMQP; import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.Consumer; import?com.rabbitmq.client.Envelope; import?com.rabbitmq.client.ShutdownSignalException;import?java.io.IOException; import?java.util.concurrent.TimeoutException;/***?@author:?會跳舞的機器人*?@date:?2017/10/13?16:20*?@description:?rabbitmq消費者,模擬下單成功后給用戶發送通知*/ public?class?NotifyConsumer?implements?Consumer?{private?Connection?connection;private?Channel?channel;//?交換器名稱private?static?final?String?EXCHANGE?=?"EXCHANGE.ORDER_CREATE";//?通知用戶下單成功通知隊列名稱private?static?final?String?QUEUENAME?=?"QUEUE.ORDER_NOTIFY";public?void?consume()?{//?初始化rabbitmq連接信息ConnectionFactory?connectionFactory?=?new?ConnectionFactory();connectionFactory.setHost("192.168.12.44");connectionFactory.setPort(56720);connectionFactory.setUsername("baibei");connectionFactory.setPassword("baibei");try?{connection?=?connectionFactory.newConnection();channel?=?connection.createChannel();//?聲明交換器channel.exchangeDeclare(EXCHANGE,?"topic",?true);//?聲明隊列channel.queueDeclare(QUEUENAME,?true,?false,?false,?null);//?交換器與隊列綁定并設置routingKeychannel.queueBind(QUEUENAME,?EXCHANGE,?"order_create");//?消費消息,callback是該類,關閉自動確認消息,在完成業務邏輯后手動確認確認channel.basicConsume(QUEUENAME,?false,?this);}?catch?(IOException?e)?{e.printStackTrace();}?catch?(TimeoutException?e)?{e.printStackTrace();}}public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{String?msg?=?new?String(body,?"UTF-8");System.out.println("《通知系統》收到訂單消息:"?+?msg?+?",開始給用戶發送通知......");//?手動確認消息channel.basicAck(envelope.getDeliveryTag(),?false);/***?channel.basicReject(envelope.getDeliveryTag(),?false);該方法會丟棄掉隊列中的這條消息*?channel.basicReject(envelope.getDeliveryTag(),?true);該方法會把消息重新放回隊列*?一般系統會設定一個重試次數,如果超過重試次數,則會丟棄消息,反之則會把消息再放入隊列*/}public?void?handleConsumeOk(String?consumerTag)?{}public?void?handleCancelOk(String?consumerTag)?{}public?void?handleCancel(String?consumerTag)?throws?IOException?{}public?void?handleShutdownSignal(String?consumerTag,?ShutdownSignalException?sig)?{}public?void?handleRecoverOk(String?consumerTag)?{} }

測試

package?com.robot.rabbitmq;/***?@author:?會跳舞的機器人*?@date:?2017/10/13?16:27*?@description:*/ public?class?Test?{public?static?void?main(String[]?args)?{IncreaseScoreConsumer?increaseScoreConsumer?=?new?IncreaseScoreConsumer();increaseScoreConsumer.consume();NotifyConsumer?notifyConsumer?=?new?NotifyConsumer();notifyConsumer.consume();OrderCreator?orderCreator?=?new?OrderCreator();for?(int?i?=?0;?i?<?3;?i++)?{orderCreator.createOrder();}} }

輸出:

下單成功,開始發送rabbitmq消息 《積分系統》收到訂單消息:create order success,給用戶增加積分...... 《通知系統》收到訂單消息:create order success,開始給用戶發送通知...... 下單成功,開始發送rabbitmq消息 《積分系統》收到訂單消息:create order success,給用戶增加積分...... 《通知系統》收到訂單消息:create order success,開始給用戶發送通知...... 下單成功,開始發送rabbitmq消息 《積分系統》收到訂單消息:create order success,給用戶增加積分...... 《通知系統》收到訂單消息:create order success,開始給用戶發送通知...

總結

以上是生活随笔為你收集整理的RabbitMQ简介以及应用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。