日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 消息队列入门

發布時間:2023/12/31 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 消息队列入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

什么是 RabbitMQ

MQ(Message Queue)消息隊列

消息隊列中間件,是分布式系統中的重要組件;主要解決異步處理、應用解耦、流量削峰等問題,從而實現高性能,高可用,可伸縮和最終一致性的架構。

使用較多的消息隊列產品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka 等。

異步處理

用戶注冊后,需要發送驗證郵箱和手機驗證碼。

將注冊信息寫入數據庫,發送驗證郵件,發送手機,三個步驟全部完成后,返回給客戶端。

傳統:

客戶端 <-> 注冊信息寫入數據庫 -> 發送注冊郵件 -> 發送注冊短信

現在:

客戶端 <-> 注冊信息寫入數據庫 -> 寫入消息隊列 -> 異步 [發送注冊郵件,發送注冊短信]
應用解耦

場景:訂單系統需要通知庫存系統。

如果庫存系統異常,則訂單調用庫存失敗,導致下單失敗。

原因:訂單系統和庫存系統耦合度太高。

傳統:

用戶 <-> 訂單系統 - 調用庫存接口 -> 庫存系統

現在:

用戶 <-> 訂單系統 - 寫入 -> 消息隊列 <- 訂閱 - 庫存系統

訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶,下單成功。

庫存系統:訂閱下單的消息,獲取下單信息,庫存系統根據下單信息,再進行庫存操作。

假如:下單的時候,庫存系統不能正常運行,也不會影響下單,因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了,實現了訂單系統和庫存系統的應用解耦。

所以,消息隊列是典型的“生產者-消費者“模型。

生產者不斷的向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。

因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的入侵,這樣就實現了生產者和消費者的解耦。

流量削峰

搶購,秒殺等業務,針對高并發的場景。

因為流量過大,暴增會導致應用掛掉,為解決這個問題,在前端加入消息隊列。

用戶的請求,服務器接收后,首先寫入消息隊列,如果超過隊列的長度,就拋棄,發送一個結束的頁面;而請求成功的就是進入隊列的用戶。

背景知識介紹

AMQP 高級消息隊列協議

Advanced Message Queuing Protocol 是一個提供統一消息服務的應用層標準高級消息隊列協議。

協議:數據在傳輸的過程中必須要遵守的規則。

基于此協議的客戶端可以與消息中間件傳遞消息。

并不受產品、開發語言等條件的限制。

JMS

Java Message Server 是 Java 消息服務應用程序接口,一種規范,和 JDBC 擔任的角色類似。

JMS 是一個 Java 平臺中關于面向消息中間件的 API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。

二者的聯系

JMS 是定義了統一接口,統一消息操作;AMQP 通過協議統一數據交互格式。

JMS 必須是 Java 語言;AMQP 只是協議,與語言無關。

Erlang 語言

Erlang 是一種通用的面向并發的編程語言,目的是創造一種可以應對大規模并發活動的編程語言和運行環境。

最初是專門為通信應用設計的,比如控制交換機或者變換協議等,因此非常適合構建分布式,實時軟并行計算系統。

Erlang 運行時環境是一個虛擬機,有點像 Java 的虛擬機,這樣代碼一經編譯,同樣可以隨處運行。

為什么選擇 RabbitMQ

RabbitMQ 由 Erlang 開發,AMQP 的最佳搭檔,安裝部署簡單,上手門檻低。

企業級消息隊列,經過大量實踐考驗的高可靠,大量成功的應用案例,例如阿里、網易等一線大廠都有使用。

有強大的 WEB 管理頁面。

強大的社區支持,為技術進步提供動力。

支持消息持久化、支持消息確認機制、靈活的任務分發機制等,支持功能非常豐富。

集群擴展很容易,并且可以通過增加節點實現成倍的性能提升。

總結:如果希望使用一個可靠性高、功能強大、易于管理的消息隊列系統那么就選擇 RabbitMQ;如果想用一個性能高,但偶爾丟點數據,可以使用 Kafka 或者 ZeroMQ。

Kafka 和 ZeroMQ 的性能比 RabbitMQ 好很多。

RabbitMQ 各組件功能

Publisher --> Exchange --banding--> Queue --> Connection --> Consumer|-------------------------------| | |-------------------------| | | | |------------------| | | | | |Exchange --> Queue| | | | | |------------------| | | | | Virtual Host | | | |-------------------------| | | Broker | |-------------------------------|Broker 包含 Virtual Host Virtual Host 包含 Exchange 和 QueueConnection 包含多個 Channel

Broker - 消息隊列服務器實體。

Virtual Host - 虛擬主機:

  • 標識一批交換機、消息隊列和相關對象,形成的整體。
  • 虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。
  • 每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。
  • VHost 是 AMQP 概念的基礎,RabbitMQ 默認的 vhost 是 /,必須在鏈接時指定。

Exchange - 交換器(路由):用來接收生產者發送的消息并將這些消息通過路由發給服務器中的隊列。

Banding - 綁定。

Queue - 消息隊列:

  • 用來保存消息直到發送給消費者。
  • 它是消息的容器,也是消息的終點。
  • 一個消息可投入一個或多個隊列。
  • 消息一直在隊列里面,等待消費者連接到這個隊列將其取走。

Banding - 綁定:用于消息隊列和交換機之間的關聯。

Channel - 通道(信道):

  • 多路復用連接中的一條獨立的雙向數據流通道。
  • 信道是建立在真實的 TCP 連接內的虛擬鏈接。
  • AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,都是通過信道完成的。
  • 因為對于操作系統來說,建立和銷毀 TCP 連接都是非常昂貴的開銷,所以引入了信道的概
    念,用來復用 TCP 連接。

Connection - 網絡連接,比如一個 TCP 連接。

Publisher - 消息的生產者,也是一個向交換器發布消息的客戶端應用程序。

Consumer - 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

Message - 消息:

  • 消息是不具名的,它是由消息頭和消息體組成。
  • 消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(優先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。

使用 RabbitMQ

想要安裝 RabbitMQ,必須先安裝 erlang 語言環境;類似安裝 tomcat,必須先安裝 JDK。

查看匹配的版本:https://www.rabbitmq.com/which-erlang.html

RabbitMQ 安裝啟動

Erlang 下載:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang

Socat 下載:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

RabbitMQ 下載:https://www.rabbitmq.com/install-rpm.html#downloads

安裝

啟動 Linux 系統(192.168.186.128),傳輸相關的三個 rpm 到 /opt 目錄下,然后在 /opt 目錄下按順序執行安裝命令:

rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
啟動后臺管理插件
rabbitmq-plugins enable rabbitmq_management
啟動 RabbitMQ
systemctl start rabbitmq-server.service systemctl status rabbitmq-server.service systemctl restart rabbitmq-server.service systemctl stop rabbitmq-server.service
查看進程
ps -ef | grep rabbitmq
測試
  • 防火墻開放對應的端口號
  • firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --zone=public --add-port=5671/tcp --permanent firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --zone=public --add-port=25672/tcp --permanent firewall-cmd --reload

    2)瀏覽器輸入:http://192.168.186.128:15672

    3)默認帳號和密碼是 guest,而 guest 用戶默認不允許遠程連接

    創建賬號:

    rabbitmqctl add_user renda 123456

    設置用戶角色:

    rabbitmqctl set_user_tags renda administrator

    設置用戶權限:

    rabbitmqctl set_permissions -p "/" renda ".*" ".*" ".*"

    查看當前用戶和角色:

    rabbitmqctl list_users

    修改用戶密碼:

    rabbitmqctl change_password renda NewPassword

    管理界面介紹:

    • Overview - 概覽

    • Connections - 查看鏈接情況

    • Channels - 信道(通道)情況

    • Exchanges - 交換機(路由)情況,默認4類7個

    • Queues - 消息隊列情況

    • Admin - 管理員列表

    • RabbitMQ 提供給編程語言客戶端鏈接的端口 - 5672;RabbitMQ 管理界面的端口 15672;RabbitMQ 集群的端口 - 25672。

    RabbitMQ 快速入門

    依賴
    <!-- 指定編碼及版本 --> <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.encoding>UTF-8</maven.compiler.encoding><java.version>1.11</java.version><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target> </properties><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency> </dependencies>
    日志依賴 log4j(可選項)
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rebbitmq.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n log4j.rootLogger=debug, stdout,file
    創建連接

    先在 RabbitMQ 管理界面 Admin -> Virtual Hosts -> Add a new virtual host 創建虛擬主機 (Name: /renda, Description: 張人大, Tags: administrator);

    然后編寫連接的代碼:

    public class ConnectionUtil {public static Connection getConnection() throws Exception{// 1.創建連接工廠ConnectionFactory factory = new ConnectionFactory();// 2.在工廠對象中設置 MQ 的連接信息(ip, port, vhost, username, password)factory.setHost("192.168.186.128");factory.setPort(5672);factory.setVirtualHost("/renda");factory.setUsername("renda");factory.setPassword("123456");// 3.通過工廠獲得與 MQ 的連接return factory.newConnection();}public static void main(String[] args) throws Exception {Connection connection = getConnection();System.out.println("Connection: " + connection);connection.close();}}

    RabbitMQ 模式

    RabbitMQ 提供了 6 種消息模型,但是第 6 種其實是 RPC,并不是 MQ。

    在線手冊:https://www.rabbitmq.com/getstarted.html

    5 種消息模型,大體分為兩類:

    • 1 和 2 屬于點對點。
    • 3、4、5 屬于發布訂閱模式(一對多)。

    點對點模式 - P2P(Point to Point)模式:

    • 包含三個角色:消息隊列 queue,發送者 sender,接收者 receiver。

    • 每個消息發送到一個特定的隊列中,接收者從中獲得消息。

    • 隊列中保留這些消息,直到他們被消費或超時。

    • 如果希望發送的每個消息都會被成功處理,那需要 P2P。

    點對點模式特點:

    • 每個消息只有一個消費者,一旦消費,消息就不在隊列中了。
    • 發送者和接收者之間沒有依賴性,發送者發送完成,不管接收者是否運行,都不會影響消息發送到隊列中。
  • 接收者成功接收消息之后需向對象應答成功(確認)。
  • 發布訂閱模式 - publish / subscribe 模式:

    • Pub / Sub 模式包含三個角色:交換機 exchange,發布者 publisher,訂閱者 subcriber。
    • 多個發布者將消息發送交換機,系統將這些消息傳遞給多個訂閱者。
    • 如果希望發送的消息被多個消費者處理,可采用 Pub / Sub。

    發布訂閱模式特點:

    • 每個消息可以有多個訂閱者。
    • 發布者和訂閱者之間在時間上有依賴,對于某個交換機的訂閱者,必須創建一個訂閱后,才能消費發布者的消息。
  • 為了消費消息,訂閱者必須保持運行狀態。
  • 簡單模式

    RabbitMQ 本身只是接收,存儲和轉發消息,并不會對信息進行處理;類似郵局,處理信件的應該是收件人而不是郵局。

    生產者 P
    public class Sender {public static void main(String[] args) throws Exception {String msg = "Hello, 你好 Renda";// 1.獲得連接Connection connection = ConnectionUtil.getConnection();// 2.在連接中創建通道(信道)Channel channel = connection.createChannel();// 3.創建消息隊列 (1,2,3,4,5)/*參數 1: 隊列的名稱參數 2: 隊列中的數據是否持久化參數 3: 是否排外(是否支持擴展,當前隊列只能自己用,不能給別人用)參數 4: 是否自動刪除(當隊列的連接數為 0 時,隊列會銷毀,不管隊列是否還存保存數據)參數 5: 隊列參數(沒有參數為 null)*/channel.queueDeclare("queue1", false, false, false, null);// 4.向指定的隊列發送消息 (1,2,3,4)/*參數 1: 交換機名稱,當前是簡單模式,也就是 P2P 模式,沒有交換機,所以名稱為 ""參數 2: 目標隊列的名稱參數 3: 設置消息的屬性(沒有屬性則為 null)參數 4: 消息的內容 (只接收字節數組)*/channel.basicPublish("", "queue1", null, msg.getBytes());System.out.println("發送:" + msg);// 5.釋放資源channel.close();connection.close();}}

    啟動生產者,即可前往管理端查看隊列中的信息,會有一條信息沒有處理。

    消費者 C
    public class Receiver {public static void main(String[] args) throws Exception {// 1.獲得連接Connection connection = ConnectionUtil.getConnection();// 2.獲得通道(信道)Channel channel = connection.createChannel();// 3.從信道中獲得消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 交付處理(收件人信息,包裹上的快遞標簽,協議的配置,消息)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 就是從隊列中獲取的消息String s = new String(body);System.out.println("獲取消息為:" + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume("queue1", true, consumer);}}

    啟動消費者,前往管理端查看隊列中的信息,所有信息都已經處理和確認,顯示 0。

    消息確認機制 ACK

    通過剛才的案例可以看出,消息一旦被消費,消息就會立刻從隊列中移除。

    如果消費者接收消息后,還沒執行操作就拋異常宕機導致消費失敗,但是 RabbitMQ 無從得知,這樣消息就丟失了。

    因此,RabbitMQ 有一個 ACK 機制,當消費者獲取消息后,會向 RabbitMQ 發送回執 ACK,告知消息已經被接收。

    ACK - Acknowledge character 即是確認字符,在數據通信中,接收站發給發送站的一種傳輸類控制字符,表示發來的數據已確認接收無誤。在使用 http 請求時,http 的狀態碼 200 就是表示服務器執行成功。

    整個過程就像快遞員將包裹送到你手里,并且需要你的簽字,并拍照回執。

    不過這種回執 ACK 分為兩種情況:

    • 自動 ACK - 消息接收后,消費者立刻自動發送 ACK,類似快遞放在快遞柜。
    • 手動 ACK - 消息接收后,不會發送 ACK,需要手動調用,類似快遞必須本人簽收。

    兩種情況如何選擇,需要看消息的重要性:

    • 如果消息不太重要,丟失也沒有影響,自動 ACK 會比較方便。
    • 如果消息非常重要,最好消費完成手動 ACK;如果自動 ACK 消費后,RabbitMQ 就會把消息從隊列中刪除,而此時消費者拋異常宕機,那么消息就永久丟失了。

    修改啟動手動 ACK 消息確認:

    // 監聽隊列 false: 手動消息確認 channel.basicConsume("queue1", false, consumer);

    啟動生產者和消費者,前往管理端查看隊列中的信息,會有一條信息沒有確認(Unacked)。

    手動 ACK 消息確認解決問題:

    public class ReceiverAck {public static void main(String[] args) throws Exception {// 1.獲得連接Connection connection = ConnectionUtil.getConnection();// 2.獲得通道(信道)final Channel channel = connection.createChannel();// 3.從信道中獲得消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 交付處理(收件人信息,包裹上的快遞標簽,協議的配置,消息)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body就是從隊列中獲取的消息String s = new String(body);System.out.println("獲取消息為:" + s);// 手動確認(收件人信息,是否同時確認多個消息)channel.basicAck(envelope.getDeliveryTag(), false);}};// 4.監聽隊列 false: 手動消息確認channel.basicConsume("queue1", false, consumer);}}
    工作隊列模式

    簡單模式,一個消費者來處理消息,如果生產者生產消息過快過多,而消費者的能力有限,就會產生消息在隊列中堆積(生活中的滯銷)。

    當運行許多消費者程序時,消息隊列中的任務會被眾多消費者共享,但其中某一個消息只會被一個消費者獲取(100 支肉串 20 個人吃,但是其中的某支肉串只能被一個人吃)。

    生產者 P
    public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("test_work_queue",false,false,false,null);for(int i = 1;i<=100;i++) {String msg = "Message --> " + i;channel.basicPublish("", "test_work_queue", null, msg.getBytes());System.out.println(msg);}channel.close();connection.close();}}
    消費者 1
    public class Receiver1 {// 統計獲取的信息的數量static int counter = 1;public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創建;如果隊列存在,則獲取channel.queueDeclare("test_work_queue", false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Receiver 1: " + s + ". Total Message Count: " + counter++);// 模擬網絡延遲 200 毫秒try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}// 手動確認(收件人信息,是否同時確認多個消息)channel.basicAck(envelope.getDeliveryTag(), false);}};// 4.監聽隊列 false:手動消息確認channel.basicConsume("test_work_queue", false, consumer);}}
    消費者 2
    public class Receiver2 {// 統計獲取的信息的數量static int counter = 1;public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創建;如果隊列存在,則獲取channel.queueDeclare("test_work_queue", false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Receiver 2: " + s + ". Total Message Count: " + counter++);// 模擬網絡延遲 900 毫秒try {Thread.sleep(900);} catch (InterruptedException e) {e.printStackTrace();}// 手動確認(收件人信息,是否同時確認多個消息)channel.basicAck(envelope.getDeliveryTag(), false);}};// 4.監聽隊列 false:手動消息確認channel.basicConsume("test_work_queue", false, consumer);}}
    能者多勞

    先運行 2 個消費者,排隊等候消費(取餐),再運行生產者開始生產消息(烤肉串)。

    由運行結果可以看到,雖然兩個消費者的消費速度不一致(線程休眠時間),但是消費的數量卻是一致的,各消費 50 個消息。

    • 例如:工作中,A 編碼速率高,B 編碼速率低,兩個人同時開發一個項目,A 10 天完成,B 30 天完成,A 完成自己的編碼部分,就無所事事了,等著 B 完成就可以了,這樣是不可以的,應該遵循“能者多勞”。
    • 效率高的多干點,效率低的少干點。

    為了克服這個問題,可以使用設置為 prefetchCount = 1 的 basicQos 方法。這告訴RabbitMQ 一次不要給一個 worker 發送一條以上的消息。或者,換句話說,在 worker 處理并確認前一個消息之前,不要向它發送新消息。相反,它將把它分派到下一個不繁忙的 worker。

    在消費者 1 和消費者 2 中加上 channel.basicQos(1):

    ... // queueDeclare() 此方法有雙重作用,如果隊列不存在,就創建;如果隊列存在,則獲取 channel.queueDeclare("test_work_queue", false, false, false, null); // 開啟一次接受一條消息。可以理解為:快遞一個一個送,送完一個再送下一個,速度快的送件就多 channel.basicQos(1); ...

    能者多勞必須要配合手動的 ACK 機制才生效。

    如何避免消息堆積?
    • Workqueue,多個消費者監聽同一個隊列。
    • 接收到消息后,通過線程池,異步消費。
    發布/訂閱模式

    工作隊列背后的假設是,每個任務都被準確地交付給一個工作者;“發布/訂閱”模式將一個消息傳遞給多個消費者。

    生活中的案例:眾多粉絲關注一個視頻主,視頻主發布視頻,所有粉絲都可以得到視頻通知。

    生產者 P 發送信息給路由 X,路由 X 將信息轉發給綁定路由 X 的隊列;隊列將信息通過信道發送給消費者,最后消費者進行消費。整個過程,必須先創建路由。

    路由在生產者程序中創建。

    路由沒有存儲消息的能力,當生產者將信息發送給路由后,消費者還沒有運行,所以沒有隊列,路由并不知道將信息發送給誰。

    運行程序的順序:

    • 執行一次 MessageSender,聲明了路由。
  • 執行 MessageReceiver1 和 MessageReceiver2,綁定到路由。
  • 再次執行 MessageSender,發送消息給路由。
  • 生產者
    public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明路由(路由名,路由類型)// fanout:不處理路由鍵(只需要將隊列綁定到路由上,發送到路由的消息都會被轉發到與該路由綁定的所有隊列上)channel.exchangeDeclare("test_exchange_fanout", "fanout");String msg = "Hello,Renda";channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());System.out.println("Publisher:" + msg);channel.close();connection.close();}}
    消費者 1
    public class Receiver1 {private static final String RECEIVER_QUEUE = "test_exchange_fanout_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);// 綁定路由(關注)channel.queueBind(RECEIVER_QUEUE, "test_exchange_fanout", "");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Subscriber 1: " + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE, true, consumer);}}
    消費者 2
    public class Receiver2 {private static final String RECEIVER_QUEUE = "test_exchange_fanout_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);// 綁定路由(關注)channel.queueBind(RECEIVER_QUEUE, "test_exchange_fanout", "");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Subscriber 2: " + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE, true, consumer);}}
    路由模式

    路由會根據類型進行定向(direct)分發消息給不同的隊列;每種類型可以對應多個消費者。

    運行程序的順序:

    • 先運行一次 Sender(創建路由器)。
    • 有了路由器之后,在創建兩個 Receiver1 和 Receiver2,進行隊列綁定。
    • 再次運行 Sender,發出消息。
    生產者
    public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明路由 (路由名,路由類型)// direct:根據路由鍵進行定向分發消息channel.exchangeDeclare("test_exchange_direct", "direct");String msg = "Register New User: userid=S101";channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes());System.out.println(msg);channel.close();connection.close();}}
    消費者 1
    public class Receiver1 {private static final String RECEIVER_QUEUE = "test_exchange_direct_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);// 綁定路由(如果路由鍵的類型是 添加,刪除,修改 的話,綁定到這個隊列 1 上)channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "insert");channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "update");channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "delete");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Cosumer 1: " + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE, true, consumer);}}
    消費者 2
    public class Receiver2 {private static final String RECEIVER_QUEUE = "test_exchange_direct_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);// 綁定路由(如果路由鍵的類型是 添加,刪除,修改 的話,綁定到這個隊列 2 上)channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "insert");channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "update");channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "delete");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Cosumer 2: " + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE, true, consumer);}}
    通配符模式

    通配符模式是和路由模式差不多,唯獨的區別就是路由鍵支持模糊匹配。

    匹配符號:

    • * - 只能匹配一個詞(正好一個詞,多一個不行,少一個也不行)。
    • # - 匹配 0 個或更多個詞。

    案例:

    Q1 綁定了路由鍵 `*.orange.*` Q2 綁定了路由鍵 `*.*.rabbit``lazy.#`quick.orange.rabbit # Q1 Q2 lazy.orange.elephant # Q1 Q2 quick.orange.fox # Q1 lazy.brown.fox # Q2 lazy.pink.rabbit # Q2 quick.brown.fox # 無 orange # 無 quick.orange.male.rabbit # 無
    生產者
    public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明路由 (路由名,路由類型)// topic:模糊匹配的定向分發channel.exchangeDeclare("test_exchange_topic", "topic");String msg = "price-off promotion";channel.basicPublish("test_exchange_topic", "product.price", null, msg.getBytes());System.out.println("Provider: " + msg);channel.close();connection.close();}}
    消費者 1
    public class Receiver1 {private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);// 綁定路由(綁定用戶相關的消息)channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "user.#");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Consumer 1: " + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE, true, consumer);}}
    消費者 2
    public class Receiver2 {private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);// 綁定路由(綁定用戶相關的消息)channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "product.#");channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "order.#");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Consumer 2: " + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE, true, consumer);}}

    持久化

    消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何避免消息丟失?

    消費者的 ACK 確認機制,可以防止消費者丟失消息。

    萬一在消費者消費之前,RabbitMQ 服務器宕機了,那消息也會丟失。

    想要將消息持久化,那么路由和隊列都要持久化才可以。

    生產者
    public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明路由 (路由名,路由類型,持久化)// topic:模糊匹配的定向分發channel.exchangeDeclare("test_exchange_topic", "topic", true);String msg = "price-off promotion";// 信道持久化channel.basicPublish("test_exchange_topic", "product.price", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());System.out.println("Provider: " + msg);channel.close();connection.close();}}
    消費者
    public class Receiver1 {private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列 (第二個參數為 true:支持持久化)channel.queueDeclare(RECEIVER_QUEUE, true, false, false, null);// 綁定路由(綁定用戶相關的消息)channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "user.#");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Consumer 1: " + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE, true, consumer);}}

    Spring 整合 RabbitMQ

    五種消息模型,在企業中應用最廣泛的就是定向匹配 topics。

    Spring AMQP 是基于 Spring 框架的 AMQP 消息解決方案,提供模板化的發送和接收消息的抽象層,提供基于消息驅動的 POJO 的消息監聽等,簡化了對于 RabbitMQ 相關程序的開發。

    生產端工程

    依賴 pom.xml

    <dependencies><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency> </dependencies>

    spring-rabbitmq-producer.xml

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 1.配置連接 --><rabbit:connection-factory id="connectionFactory"host="192.168.186.128"port="5672"username="renda"password="123456"virtual-host="/renda"publisher-confirms="true"/><!-- 2.配置隊列 --><rabbit:queue name="test_spring_queue_1"/><!-- 3.配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創建,綁定,刪除隊列與交換機,發送消息等 --><rabbit:admin connection-factory="connectionFactory"/><!-- 4.配置交換機,topic 類型 --><rabbit:topic-exchange name="spring_topic_exchange"><rabbit:bindings><!-- 綁定隊列 --><rabbit:binding pattern="msg.#" queue="test_spring_queue_1"/></rabbit:bindings></rabbit:topic-exchange><!-- 5.配置 json 轉換的工具 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/><!-- 6.配置 rabbitmq 的模版 --><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_topic_exchange"message-converter="jsonMessageConverter"/></beans>

    發消息 com.renda.test.Sender:

    public class Sender {public static void main(String[] args) {// 1.創建 spring 容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");// 2.從 spring 容器中獲得 rabbit 模版對象RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);// 3.發消息Map<String, String> map = new HashMap<String, String>();map.put("name", "張人大");map.put("email", "123456789@qq.com");rabbitTemplate.convertAndSend("msg.user", map);System.out.println("Message Sent...");context.close();}}
    消費端工程

    依賴與生產者一致

    spring-rabbitmq-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsd"><!--1.配置連接--><rabbit:connection-factoryid="connectionFactory"host="192.168.186.128"port="5672"username="renda"password="123456"virtual-host="/renda"/><!-- 2.配置隊列 --><rabbit:queue name="test_spring_queue_1"/><!-- 3.配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創建,綁定,刪除隊列與交換機,發送消息等 --><rabbit:admin connection-factory="connectionFactory"/><!-- 4.注解掃描包 springIOC --><context:component-scan base-package="com.renda.listener"/><!-- 5.配置監聽 --><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/></rabbit:listener-container></beans>

    消費者:

    MessageListener 接口用于 spring 容器接收到消息后處理消息;

    如果需要使用自己定義的類型來實現處理消息時,必須實現該接口,并重寫 onMessage() 方法;

    當 spring 容器接收消息后,會自動交由 onMessage 進行處理。

    com.renda.listener.ConsumerListener:

    @Component public class ConsumerListener implements MessageListener {/*** jackson 提供序列化和反序列中使用最多的類,用來轉換 json 的*/private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message) {// 將 message對象轉換成 jsonJsonNode jsonNode = null;try {jsonNode = MAPPER.readTree(message.getBody());String name = jsonNode.get("name").asText();String email = jsonNode.get("email").asText();System.out.println("Message From Queue:{" + name + ", " + email + "}");} catch (IOException e) {e.printStackTrace();}}}

    啟動項目 com.renda.test.TestRunner:

    public class TestRunner {public static void main(String[] args) throws IOException {// 獲得容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");// 讓程序一直運行,別終止System.in.read();}}

    消息成功確認機制

    在實際場景下,有的生產者發送的消息是必須保證成功發送到消息隊列中,需要事務機制和發布確認機制。

    事務機制

    AMQP 協議提供的一種保證消息成功投遞的方式,通過信道開啟 transactional 模式;

    利用信道的三個方法來實現以事務方式發送消息,若發送失敗,通過異常處理回滾事務,確保消息成功投遞

    • channel.txSelect() - 開啟事務

    • channel.txCommit() - 提交事務

    • channel.txRollback() - 回滾事務

    Spring 已經對上面三個方法進行了封裝,所以這里使用原始的代碼演示。

    生產者
    public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("test_transaction", "topic");// 開啟事務channel.txSelect();try {channel.basicPublish("test_transaction", "product.price", null, "Item 1: price-off".getBytes());// 模擬出錯// System.out.println(1 / 0);channel.basicPublish("test_transaction", "product.price", null, "Item 2: price-off".getBytes());// 提交事務(一起成功)channel.txCommit();System.out.println("Producer: All Messages Sent");} catch (Exception e) {System.out.println("All Messages Rollback");// 事務回滾(一起失敗)channel.txRollback();e.printStackTrace();} finally {channel.close();connection.close();}}}
    消費者
    public class Receiver {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("test_transaction_queue", false, false, false, null);channel.queueBind("test_transaction_queue", "test_transaction", "product.#");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("Consumer: " + s);}};// 4.監聽隊列 true:自動消息確認channel.basicConsume("test_transaction_queue", true, consumer);}}
    Confirm 發布確認機制

    RabbitMQ 為了保證消息的成功投遞,采用通過 AMQP 協議層面提供事務機制的方案,但是采用事務會大大降低消息的吞吐量。

    開啟事務性能最大損失超過 250 倍。

    事務效率低下原因:100 條消息,前 99 條成功,如果第 100 條失敗,那么 99 條消息要全部撤銷回滾。

    更加高效的解決方式是采用 Confirm 模式,而 Confirm 模式則采用補發第 100 條的措施來完成 100 條消息的送達。

    在 Spring 中應用

    resources\spring\spring-rabbitmq-producer.xml

    ... <!-- 6.配置 rabbitmq 的模版 --> <rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_topic_exchange"message-converter="jsonMessageConverter"confirm-callback="messageConfirm"/><!-- 7.確認機制的處理類 --> <bean id="messageConfirm" class="com.renda.confirm.MessageConfirm"/> ...

    消息確認處理類 com.renda.confirm.MessageConfirm:

    public class MessageConfirm implements RabbitTemplate.ConfirmCallback {/*** @param correlationData 消息相關的數據對象(封裝了消息的唯一 id)* @param b 消息是否確認成功* @param s 異常信息*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b) {System.out.println("Successfully Confirmed Message");} else {System.out.println("Fail to Confirm Message, error: " + s);// 如果本條消息一定要發送到隊列中,例如下訂單消息,可以采用補發// 1.采用遞歸(限制遞歸的次數)// 2.redis + 定時任務(jdk 的 timer,或者定時任務框架 Quartz)}} }

    resources\log4j.properties

    log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%nlog4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rabbitmq.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%nlog4j.rootLogger=debug, stdout,file

    發送消息 com.renda.test.Sender:

    ... // 3.發消息 Map<String, String> map = new HashMap<String, String>(); map.put("name", "張人大"); map.put("email", "123456789@qq.com"); // 模擬發送消息失敗 // rabbitTemplate.convertAndSend("fuck", "msg.user", map); rabbitTemplate.convertAndSend("msg.user", map); System.out.println("Message Sent..."); ...

    消費端限流

    RabbitMQ 服務器積壓了成千上萬條未處理的消息,然后隨便打開一個消費者客戶端,就會出現這樣的情況:巨量的消息瞬間全部噴涌推送過來,但是單個客戶端無法同時處理這么多數據,就會被壓垮崩潰。

    所以,當數據量特別大的時候,對生產端限流肯定是不科學的,因為有時候并發量就是特別大,有時候并發量又特別少,這是用戶的行為 - 是無法約束的。

    應該對消費端限流,用于保持消費端的穩定。

    RabbitMQ 提供了一種 QoS(Quality of Service,服務質量)服務質量保證功能;

    即在非自動確認消息的前提下,如果一定數目的消息未被確認前,不再進行消費新的消息。

    生產者 com.renda.test.Sender 使用循環發出多條消息:

    ... for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("msg.user", map);System.out.println("Message Sent..."); } ...

    RabbitMQ 的管理頁面可以看到生產了 10 條堆積未處理的消息。

    消費者進行限流處理:

    resources\spring\spring-rabbitmq-consumer.xml

    ... 5.配置監聽 --> <!--prefetch="3":一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,一旦有 N 個消息還沒有 ack,則該 consumer 將阻塞,直到消息被 ack。--> <!-- acknowledge-mode: manual 手動確認--> <rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual"><rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/> </rabbit:listener-container> ...

    com.renda.listener.ConsumerListener

    @Component public class ConsumerListener extends AbstractAdaptableMessageListener {/*** jackson 提供序列化和反序列中使用最多的類,用來轉換 json 的*/private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message, Channel channel) throws Exception {// 將 message對象轉換成 json // JsonNode jsonNode = MAPPER.readTree(message.getBody()); // String name = jsonNode.get("name").asText(); // String email = jsonNode.get("email").asText(); // System.out.println("Message From Queue:{" + name + ", " + email + "}");String str = new String(message.getBody());System.out.println("str = " + str);/*** 手動確認消息(參數1,參數2)* 參數 1:RabbitMQ 想該 channel 投遞的這條消息的唯一標識 ID,此 ID 是一個單調遞增的正整數。* 參數 2:為了減少網絡流量,手動確認可以被批量處理;當該參數為 true 時,則可以一次性確認小于等于 msgId 值的所有消息。*/long msgId = message.getMessageProperties().getDeliveryTag();channel.basicAck(msgId, true);Thread.sleep(3000);System.out.println("Rest for 3 seconds and then continue for more messages...");} }

    每次最多只確認接收 3 條消息,直到消息隊列為空。

    過期時間 TTL

    Time To Live - 生存時間、還能活多久,單位毫秒。

    在這個周期內,消息可以被消費者正常消費,超過這個時間,則自動刪除(其實是被稱為 dead message 并投入到死信隊列,無法消費該消息)。

    RabbitMQ 可以對消息和隊列設置 TTL:

    • 通過隊列設置,隊列中所有消息都有相同的過期時間。
    • 對消息單獨設置,每條消息的 TTL 可以不同(更顆粒化)。
    設置隊列 TTL

    RabbitMQ 管理端刪除掉 test_spring_queue_1 隊列。

    resources\spring\spring-rabbitmq-producer.xml

    <!-- 對隊列中的消息設置過期時間 --> <rabbit:queue name="test_spring_queue_1" auto-declare="true"><rabbit:queue-arguments><entry key="x-message-ttl" value-type="long" value="5000"/></rabbit:queue-arguments> </rabbit:queue>

    5 秒之后,消息自動刪除。

    設置消息 TTL

    RabbitMQ 管理端刪除掉 test_spring_queue_1 隊列。

    設置某條消息的 TTL,只需要在創建發送消息時指定即可。

    resources\spring\spring-rabbitmq-producer.xml

    <rabbit:queue name="test_spring_queue_1"/>

    com.renda.test.Sender2

    public class Sender2 {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);// 創建消息配置對象MessageProperties messageProperties = new MessageProperties();// 設置消息過期時間messageProperties.setExpiration("6000");// 創建消息Message message = new Message("This Message will be deleted in 6000 ms".getBytes(), messageProperties);// 發消息rabbitTemplate.convertAndSend("msg.user", message);System.out.println("Message Sent...");context.close();}}

    如果同時設置了 queue 和 message 的 TTL 值,則只有二者中較小的才會起作用。

    死信隊列

    DLX(Dead Letter Exchanges)死信交換機 / 死信郵箱,當消息在隊列中由于某些原因沒有被及時消費而變成死信(dead message)后,這些消息就會被分發到 DLX 交換機中,而綁定 DLX 交換機的隊列,稱之為:“死信隊列”。

    消息沒有被及時消費的原因:

    • 消息被拒絕(basic.reject / basic.nack)并且不再重新投遞 requeue=false。
    • 消息超時未消費。
    • 達到最大隊列長度。
    my_exchange 交換機 --- 沒有及時消費的消息 ---> dlx_exchange 死信交換機my_exchange -- 路由鍵 dlx_ttl --> test_ttl_queue 消息過期 my_exchange -- 路由鍵 dlx_max --> test_max_queue 達到最大隊列長度沒有及時消費的消息:[test_ttl_queue, test_max_queue]test_ttl_queue -- 過期的消息 --> dlx_exchange test_max_queue -- 被擠出的消息 --> dlx_exchangedlx_exchange -- 路由鍵 dlx_ttl --> dlx_queue 死信隊列 dlx_exchange -- 路由鍵 dlx_max --> dlx_queue 死信隊列

    resources\spring\spring-rabbitmq-producer-dlx.xml

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置連接 --><rabbit:connection-factory id="connectionFactory"host="192.168.186.128"port="5672"username="renda"password="123456"virtual-host="/renda"publisher-confirms="true"/><!-- 配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創建,綁定,刪除隊列與交換機,發送消息等 --><rabbit:admin connection-factory="connectionFactory"/><!-- 配置 rabbitmq 的模版 --><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_topic_exchange"/><!-- 聲明死信隊列 --><rabbit:queue name="dlx_queue"/><!-- 聲明定向的死信交換機 --><rabbit:direct-exchange name="dlx_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="dlx_queue"/><rabbit:binding key="dlx_max" queue="dlx_queue"/></rabbit:bindings></rabbit:direct-exchange><!-- 聲明測試過期的消息隊列 --><rabbit:queue name="test_ttl_queue"><rabbit:queue-arguments><!-- 設置隊列的過期時間 TTL --><entry key="x-message-ttl" value-type="long" value="10000"/><!-- 消息如果超時,將消息投遞給死信交換機 --><entry key="x-dead-letter-exchange" value="dlx_exchange"/></rabbit:queue-arguments></rabbit:queue><!-- 聲明測試超出長度的消息隊列 --><rabbit:queue name="test_max_queue"><rabbit:queue-arguments><!-- 設置隊列的額定長度 (本隊列最多裝 2 個消息) --><entry key="x-max-length" value-type="long" value="2"/><!-- 消息如果超出長度,將消息投遞給死信交換機 --><entry key="x-dead-letter-exchange" value="dlx_exchange"/></rabbit:queue-arguments></rabbit:queue><!-- 聲明定向的測試消息的交換機 --><rabbit:direct-exchange name="my_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="test_ttl_queue"/><rabbit:binding key="dlx_max" queue="test_max_queue"/></rabbit:bindings></rabbit:direct-exchange></beans>

    發消息進行測試

    public class SendDLX {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);// 測試超時// rabbitTemplate.convertAndSend("dlx_ttl", "Overtime: Close".getBytes());// 測試超過最大長度rabbitTemplate.convertAndSend("dlx_max", "OverSize: 1".getBytes());rabbitTemplate.convertAndSend("dlx_max", "OverSize: 2".getBytes());rabbitTemplate.convertAndSend("dlx_max", "OverSize: 3".getBytes());System.out.println("Message Sent...");context.close();}}

    延遲隊列

    延遲隊列 = TTL + 死信隊列的合體。

    死信隊列只是一種特殊的隊列,里面的消息仍然可以消費。

    在電商開發部分中,都會涉及到延時關閉訂單,此時延遲隊列正好可以解決這個問題。

    生產者

    沿用上面死信隊列案例的超時測試,超時時間改為訂單關閉時間即可。

    消費者

    resources\spring\spring-rabbitmq-consumer.xml

    ... <!-- 監聽死信隊列--> <rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual"><rabbit:listener ref="consumerListener" queue-names="dlx_queue"/> </rabbit:listener-container> ...

    RabbitMQ 集群

    RabbitMQ 有 3 種模式,其中 2 種是集群模式。

    單一模式:即單機情況不做集群,就單獨運行一個 RabbitMQ 而已。

    普通模式:默認模式,以兩個節點(A、B)為例來進行說明:

    • 當消息進入 A 節點的 Queue 后,Consumer 從 B 節點消費時,RabbitMQ 會在 A 和 B 之間創建臨時通道進行消息傳輸,把 A 中的消息實體取出并經過通過交給 B 發送給 Consumer。
    • 當 A 故障后,B 就無法取到 A 節點中未消費的消息實體;如果做了消息持久化,那么得等 A 節點恢復,然后才可被消費;如果沒有持久化的話,就會產生消息丟失的現象。

    鏡像模式 - 經典的 Mirror 鏡像模式,保證數據不丟失:

    • 高可靠性解決方案,主要就是實現數據的同步,一般來講是 2 - 3 個節點實現數據同步。
    • 對于 100% 數據可靠性解決方案,一般是采用 3 個節點。
    • 在實際工作中也是用得最多的,并且實現非常的簡單,一般互聯網大廠都會構建這種鏡像集群模式。

    另外,還有主備模式,遠程模式,多活模式等等。

    集群搭建

    前置條件:準備兩臺 linux(192.168.186.128 和 192.168.186.129),并安裝好 RabbitMQ。

  • 修改映射文件 vim /etc/hosts 。
  • 1 號服務器:

    127.0.0.1 A localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 A localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.186.128 A 192.168.186.129 B

    2 號服務器:

    127.0.0.1 A localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 A localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.186.128 A 192.168.186.129 B

    修改完 hosts 文件后,需要重啟 Linux 服務器 reboot,否則配置不生效。

  • 相互通信,cookie 必須保持一致,同步 RabbitMQ 的 cookie 文件:跨服務器拷貝 .erlang.cookie(隱藏文件,使用 ls -all 顯示)。
  • scp /var/lib/rabbitmq/.erlang.cookie 192.168.186.129:/var/lib/rabbitmq/

    修改 cookie 文件,要重啟 linux 服務器 reboot。

  • 防火墻開放 epmd 端口 4369,啟動 RabbitMQ 服務。
  • firewall-cmd --zone=public --add-port=4369/tcp --permanent firewall-cmd --reload systemctl start rabbitmq-server
  • 加入集群節點,節點 A 加入 節點 B,或者節點 B 加入節點 A 都可以:
  • [root@A ~]# rabbitmqctl stop_app Stopping rabbit application on node rabbit@A ... [root@A ~]# rabbitmqctl join_cluster rabbit@B Clustering node rabbit@A with rabbit@B [root@A ~]# rabbitmqctl start_app Starting node rabbit@A ...
  • 查看節點狀態:
  • rabbitmqctl cluster_status
  • 查看管理端
  • 搭建集群結構之后,之前創建的交換機、隊列、用戶都屬于單一結構,在新的集群環境中是不能用的。

    所以在新的集群中重新手動添加用戶即可(任意節點添加,所有節點共享)。

    [root@A ~]# rabbitmqctl add_user renda 123456 Adding user "renda" ... [root@A ~]# rabbitmqctl set_user_tags renda administrator Setting tags for user "renda" to [adminstrator] ... [root@A ~]# rabbitmqctl set_permissions -p "/" renda ".*" ".*" ".*" Setting permissions for user "renda" in vhost "/" ... [root@A ~]# rabbitmqctl list_users Listing users ... user tags renda [administrator] guest [administrator]

    訪問 http://192.168.186.128:15672 和 http://192.168.186.129:15672,兩個節點共享用戶。

    注意:當節點脫離集群還原成單一結構后,交換機,隊列和用戶等數據都會重新回來。

    此時,RabbitMQ 的集群搭建完畢,但是默認采用的模式為“普通模式”,可靠性不高。

    鏡像模式

    將所有隊列設置為鏡像隊列,即隊列會被復制到各個節點,各個節點狀態一致。

    語法:set_policy {NAME} {PATTERN} {DEFINITION}

    NAME - 策略名,可自定義

    PATTERN - 隊列的匹配模式(正則表達式)

    • ^ 可以使用正則表達式,比如 ^queue_ 表示對隊列名稱以 queue_ 開頭的所有隊列進行鏡像,而 ^ 會匹配所有的隊列。

    DEFINITION - 鏡像定義,包括三個部分 ha-mode, ha-params, ha-sync-mode

    • ha-mode - high available 高可用模式,指鏡像隊列的模式,有效值為 all/exactly/nodes;當前策略模式為 all,即復制到所有節點,包含新增節點。all 表示在集群中所有的節點上進行鏡像;exactly 表示在指定個數的節點上進行鏡像,節點的個數由 ha-params 指定;nodes 表示在指定的節點上進行鏡像,節點名稱通過 ha-params 指定。
    • ha-params - ha-mode 模式需要用到的參數。
    • ha-sync-mode - 進行隊列中消息的同步方式,有效值為 automatic 和 manual。
    [root@A ~]# rabbitmqctl set_policy policy_renda "^" '{"ha-mode":"all"}' Setting policy "policy_renda" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...

    通過管理端 Admin -> Policies -> Add / update a policy 設置鏡像策略。

    設置好鏡像模式后,在節點 A 增加了隊列后,節點 B 也可以看到新增的隊列。

    在 RabbitMQ 管理界面 Admin -> Virtual Hosts -> Add a new virtual host 創建虛擬主機 /renda;

    使用 Spring 整合的 RabbitMQ 重新測試發送和接受消息;在其中一個節點使用命令 rabbitmqctl stop_app 停掉,再測試,仍然可以發送和接受消息。

    HAProxy 實現鏡像隊列的負載均衡

    雖然在程序中訪問 A 服務器,可以實現消息的同步,但都是 A 服務器在接收消息,A 太累;是否可以負載均衡,A 和 B 輪流接收消息,再鏡像同步。

    HAProxy 簡介

    HA - High Available 高可用,Proxy - 代理。

    HAProxy 是一款提供高可用性,負載均衡,并且基于 TCP 和 HTTP 應用的代理軟件。

    HAProxy 完全免費。

    HAProxy 可以支持數以萬計的并發連接。

    HAProxy 可以簡單又安全的整合進架構中,同時還保護 Web 服務器不被暴露到網絡上。

    生產者 -- 投遞消息 --> HAProxy 消費者 -- 訂閱消息 --> HAProxyHAProxy ---> [MQ Node 1, MQ Node 2, MQ Node 3]
    HAProxy 與 Nginx

    OSI - Open System Interconnection 開放式系統互聯,是把網絡通信的工作分為 7 層,分別是物理層、數據鏈路層、網絡層、傳輸層、會話層、表示層和應用層。

    Nginx 的優點:

    • 工作在 OSI 第 7 層,可以針對 http 應用做一些分流的策略。
    • Nginx 對網絡的依賴非常小,理論上能 ping 通就就能進行負載功能,屹立至今的絕對優勢。
    • Nginx 安裝和配置比較簡單,測試起來比較方便。
    • Nginx 不僅僅是一款優秀的負載均衡器 / 反向代理軟件,它同時也是功能強大的 Web 應用服務器。

    HAProxy 的優點:

    • 工作在網絡 4 層和 7 層,支持 TCP 與 Http 協議。
    • 它僅僅就只是一款負載均衡軟件;單純從效率上來講 HAProxy 更會比 Nginx 有更出色的負載均衡速度,在并發處理上也是優于 Nginx 的。
    • 支持 8 種負載均衡策略 ,支持心跳檢測。

    性能上 HAProxy 勝,但是功能性和便利性上 Nginx 勝。

    對于 Http 協議,HAProxy 處理效率比 Nginx 高;所以,沒有特殊要求的時候或者一般場景,建議使用 Haproxy 來做 Http 協議負載;如果是 Web 應用,建議使用 Nginx。

    需要結合使用場景的特點來進行合理地選擇。

    安裝和配置

    HAProxy 下載:http://www.haproxy.org/download/1.8/src/haproxy-1.8.12.tar.gz

    上傳到第三臺 Linux 服務器(192.168.186.130)中并解壓:

    tar -zxvf haproxy-1.8.12.tar.gz

    make 時需要使用 TARGET 指定內核及版本:

    [root@localhost haproxy-1.8.12]# uname -r 3.10.0-229.el7.x86_64

    查看目錄下的 README 文件 less /opt/haproxy-1.8.12/README 可知需要根據內核版本選擇編譯參數:

    ... To build haproxy, you have to choose your target OS amongst the following ones and assign it to the TARGET variable :- linux22 for Linux 2.2- linux24 for Linux 2.4 and above (default)- linux24e for Linux 2.4 with support for a working epoll (> 0.21)- linux26 for Linux 2.6 and above- linux2628 for Linux 2.6.28, 3.x, and above (enables splice and tproxy)- solaris for Solaris 8 or 10 (others untested)- freebsd for FreeBSD 5 to 10 (others untested)- netbsd for NetBSD- osx for Mac OS/X- openbsd for OpenBSD 5.7 and above- aix51 for AIX 5.1 ...

    進入目錄,編譯和安裝:

    cd /opt/haproxy-1.8.12/ make TARGET=linux2628 PREFIX=/usr/local/haproxy make install PREFIX=/usr/local/haproxy

    安裝成功后,查看版本:

    [root@localhost haproxy-1.8.12]# /usr/local/haproxy/sbin/haproxy -v HA-Proxy version 1.8.12-8a200c7 2018/06/27 Copyright 2000-2018 Willy Tarreau <willy@haproxy.org>

    配置啟動文件,復制 haproxy 文件到 /usr/sbin 目錄下 ,復制 haproxy 腳本,到 /etc/init.d 目錄下:

    cp /usr/local/haproxy/sbin/haproxy /usr/sbin/ cp /opt/haproxy-1.8.12/examples/haproxy.init /etc/init.d/haproxy chmod 755 /etc/init.d/haproxy

    創建系統賬號:

    useradd -r haproxy

    haproxy.cfg 配置文件需要自行創建:

    mkdir /etc/haproxy vim /etc/haproxy/haproxy.cfg

    添加配置信息到 haproxy.cfg:

    # 全局配置 global# 設置日志log 127.0.0.1 local0 info# 當前工作目錄chroot /usr/local/haproxy# 用戶與用戶組user haproxygroup haproxy# 運行進程 IDuid 99gid 99# 守護進程啟動daemon# 最大連接數maxconn 4096# 默認配置 defaults# 應用全局的日志配置log global# 默認的模式 mode {tcp|http|health},TCP 是 4 層,HTTP 是 7 層,health 只返回 OKmode tcp# 日志類別 tcplogoption tcplog# 不記錄健康檢查日志信息option dontlognull# 3 次失敗則認為服務不可用retries 3# 每個進程可用的最大連接數maxconn 2000# 連接超時timeout connect 5s# 客戶端超時 30 秒,ha 就會發起重新連接timeout client 30s# 服務端超時 15 秒,ha 就會發起重新連接timeout server 15s# 綁定配置 listen rabbitmq_clusterbind 192.168.186.130:5672# 配置 TCP 模式mode tcp# 簡單的輪詢balance roundrobin# RabbitMQ 集群節點配置,每隔 5 秒對 mq 集群做檢查,2 次正確證明服務可用,3 次失敗證明服務不可用server A 192.168.186.128:5672 check inter 5000 rise 2 fall 3server B 192.168.186.129:5672 check inter 5000 rise 2 fall 3# haproxy 監控頁面地址 listen monitorbind 192.168.186.130:8100mode httpoption httplogstats enable# 監控頁面地址 http://192.168.186.130:8100/monitorstats uri /monitorstats refresh 5s

    啟動 HAProxy:

    service haproxy start

    開放對應的防火墻端口:

    firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --zone=public --add-port=8100/tcp --permanent firewall-cmd --reload

    訪問監控中心:http://192.168.186.130:8100/monitor

    項目發消息,只需要將服務器地址修改為 192.168.186.130 即可,其余不變。

    這樣,所有的請求都會交給 HAProxy,然后它會負載均衡地發給每個 RabbitMQ 服務器。

    KeepAlived 搭建高可用的 HAProxy 集群

    如果 HAProxy 服務器宕機,RabbitMQ 服務器就不可用了,所以對 HAProxy 也要做高可用的集群。

    概述

    Keepalived 是 Linux 的輕量級別的高可用熱備解決方案。

    Keepalived 的作用是檢測服務器的狀態,它根據 TCP / IP 參考模型的第三層、第四層、第五層交換機制檢測每個服務節點的狀態,如果有一臺 web 服務器宕機,或工作出現故障,Keepalived 將檢測到,并將有故障的服務器從系統中剔除,同時使用其他服務器代替該服務器的工作,當服務器工作正常后 Keepalived 自動將服務器加入到服務器群中,這些工作全部自動完成,不需要人工干涉,需要人工做的只是修復故障的服務器。

    Keepalived 基于 VRRP - Virtual Router Redundancy Protocol 虛擬路由冗余協議協議;VRRP 是一種主備(主機和備用機)模式的協議,通過 VRRP 可以在網絡發生故障時透明的進行設備切換而不影響主機之間的數據通信。

    兩臺主機之間生成一個虛擬的 ip,稱為漂移 ip,漂移 ip 由主服務器承擔,一但主服務器宕機,備份服務器就會搶奪漂移 ip,繼續工作,有效的解決了群集中的單點故障。

    KeepAlived 將多臺路由器設備虛擬成一個設備,對外提供統一 ip(Virtual IP)。

    生產者 -- 投遞消息 --> KeepAlived 消費者 -- 訂閱消息 --> KeepAlivedHAProxy 1 --> 主機 1 --> KeepAlived 虛擬 IP HAProxy 2 --> 主機 2 --> KeepAlived 虛擬 IP
    安裝 KeepAlived

    修改映射文件 vim /etc/hosts 。

    3 號服務器 192.168.186.130:

    127.0.0.1 C localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 C localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.186.128 A 192.168.186.129 B 192.168.186.130 C 192.168.186.131 D

    4 號服務器 192.168.186.131:

    127.0.0.1 D localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 D localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.186.128 A 192.168.186.129 B 192.168.186.130 C 192.168.186.131 D

    修改完 hosts 文件后,需要重啟 Linux 服務器 reboot,否則配置不生效。

    重新啟動后,需要啟動 haproxy:

    service haproxy start

    主機 C 和主機 D 都安裝 keepalived:

    yum install -y keepalived

    主機 C 修改配置文件(刪掉內容,重新創建):

    rm -rf /etc/keepalived/keepalived.conf vim /etc/keepalived/keepalived.conf ! Configuration File for keepalivedglobal_defs {# 非常重要,標識本機的 hostnamerouter_id C }vrrp_script chk_haproxy {# 執行的腳本位置script "/etc/keepalived/haproxy_check.sh"# 檢測時間間隔interval 2# 如果條件成立則權重減 20weight -20 }vrrp_instance VI_1 {# 非常重要,標識主機,備用機 131 改為 BACKUPstate MASTER# 非常重要,網卡名(ifconfig 查看)interface ens33# 非常重要,自定義,虛擬路由 ID 號(主備節點要相同)virtual_router_id 66# 優先級(0-254),一般主機的大于備機priority 100# 主備信息發送間隔,兩個節點必須一致,默認 1 秒advert_int 1# 認證匹配,設置認證類型和密碼,MASTER 和 BACKUP 必須使用相同的密碼才能正常通信authentication {auth_type PASSauth_pass 1111}track_script {# 檢查 haproxy 健康狀況的腳本chk_haproxy}# 簡稱 “VIP”virtual_ipaddress {# 非常重要,虛擬 ip,可以指定多個,以后連接 mq 就用這個虛擬ip192.168.186.66/24} } # 虛擬 ip 的詳細配置 virtual_server 192.168.186.66 5672 {# 健康檢查間隔,單位為秒delay_loop 6# lvs 調度算法 rr|wrr|lc|wlc|lblc|sh|dhlb_algo rr# 負載均衡轉發規則。一般包括 DR, NAT, TUN 3 種lb_kind NAT# 轉發協議,有 TCP 和 UDP 兩種,一般用 TCPprotocol TCP# 本機的真實 ipreal_server 192.168.186.130 5672 {# 默認為 1, 失效為 0weight 1} }

    主機 C 創建執行腳本 vim /etc/keepalived/haproxy_check.sh

    #!/bin/bash COUNT=`ps -C haproxy --no-header |wc -l` if [ $COUNT -eq 0 ];then/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfgsleep 2if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];thenkillall keepalivedfi fi

    Keepalived 組之間的心跳檢查并不能察覺到 HAproxy 負載是否正常,所以需要使用此腳本。在 Keepalived 主機上,開啟此腳本檢測 HAproxy 是否正常工作,如正常工作,記錄日志。如進程不存在,則嘗試重啟 HAproxy ,2 秒后檢測,如果還沒有,則關掉主機的 Keepalived ,此時備 Keepalived 檢測到主 Keepalived 掛掉,接管 VIP,繼續服務。

    主機 C 給腳本文件增加執行權限:

    chmod +x /etc/keepalived/haproxy_check.sh

    此時,安裝完畢,按照上面的步驟就可以安裝第二臺主機 D 了(服務器 hostname 和 ip 注意要修改)。

    service keepalived start | stop | status | restart

    啟動 keepalived(兩臺都啟動):

    service keepalived start

    查看狀態:

    ps -ef | grep haproxy ps -ef | grep keepalived

    查看 ip 情況 ip addr 或 ip a。

    啟動 keepalived 前的情況:

    [root@C keepalived]# ip a 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope host valid_lft forever preferred_lft forever 2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000link/ether 00:0c:29:ac:93:50 brd ff:ff:ff:ff:ff:ffinet 192.168.186.130/24 brd 192.168.186.255 scope global ens33valid_lft forever preferred_lft foreverinet6 fe80::20c:29ff:feac:9350/64 scope link valid_lft forever preferred_lft forever

    啟動 keepalived 后的情況:

    [root@C keepalived]# ip a 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope host valid_lft forever preferred_lft forever 2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000link/ether 00:0c:29:ac:93:50 brd ff:ff:ff:ff:ff:ffinet 192.168.186.130/24 brd 192.168.186.255 scope global ens33valid_lft forever preferred_lft foreverinet 192.168.186.66/24 scope global secondary ens33valid_lft forever preferred_lft foreverinet6 fe80::20c:29ff:feac:9350/64 scope link valid_lft forever preferred_lft forever

    可以看到 ens33 網卡還多綁定了一個 IP 地址。

    常見的網絡錯誤:子網掩碼、網關等信息要一致。

    測試 vip 和端口一起是否能提供服務

    在 192.168.186.128,A 服務器上測試。

    在服務器 A 執行 curl 192.168.186.130:5672 和 curl 192.168.186.66:5672 都能正常返回 AMPQ,說明安裝成功。

    測試 ip 漂移的規則

    使用 ip addr 或 ip a 查看虛擬 ip。

    剛開始時,C 和 D 都啟動了 KeepAlived;C 是主機,所以虛擬 ip 在主機 C,表現為主機 C 顯示 inet 192.168.186.66/24,而備機 D 不顯示。

    然后,停止主機 C 的 keepalived service keepalived stop,虛擬 ip 漂移到 D 節點,D 節點執行 ip a 可以看到 inet 192.168.186.66/24,而主機 C 卻不顯示。

    接著,重新啟動 C 節點的 Keepalived,虛擬 ip 依舊在 D 節點,并不會由于 C 的回歸而回歸。

    最后,停止 D 的 Keepalived,虛擬 ip 再漂移回 C 節點。

    測試項目發消息
    消費者或生產者 -- 漂移 IP 66 --> KeepAlived 服務 --> [HAProxy 服務器C 130, HAProxy 服務器D 131]HAProxy 服務器C 130 -- 負載均衡 --> [MQ 服務器A 128, MQ 服務器B 129] HAProxy 服務器D 130 -- 負載均衡 --> [MQ 服務器A 128, MQ 服務器B 129]

    測試單個 RabbitMQ 服務器:將服務器地址修改為 192.168.186.128,其余不變。

    測試 HAProxy 實現多個 RabbitMQ 服務器負載均衡:將服務器地址修改為 192.168.186.130,其余不變。

    測試 KeepAlived 實現的高可用的 HAProxy 集群:將服務器地址修改為 KeepAlived 的虛擬 IP 192.168.186.66,其余不變。

    想了解更多,歡迎關注我的微信公眾號:Renda_Zhang

    總結

    以上是生活随笔為你收集整理的RabbitMQ 消息队列入门的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。