日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【分布式核心技术】RabbitMQ技术入门

發布時間:2023/12/31 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【分布式核心技术】RabbitMQ技术入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

修改時間:2020年3月10日
作者:pp_x
郵箱:pp_x12138@163.com

文章目錄

  • 什么是RabbitMQ
    • MQ(MessageQueue)消息隊列
      • 異步處理
      • 應用解耦
      • 流量削峰
    • 背景知識
      • AMQP高級消息隊列協議
      • JMS
      • 二者的聯系
      • Erlang語言
    • Rabbit的優勢
    • Rabbit組件功能
  • RabbitMQ的使用
    • RabbitMQ的安裝和啟動
      • 下載地址
      • 安裝
      • 啟動后臺管理插件
      • 啟動RabbitMQ
      • 查看進程
      • 測試
    • 快速入門
    • pom依賴
    • RabbitMQ模式
      • 簡單模式
        • 生產者
        • 消費者
        • 消息確認機制ACK
      • 工作隊列模式
        • 生產者
        • 消費者1
        • 消費者2
        • 測試
        • 能者多勞原則
        • 面試題
      • 發布訂閱模式
        • 生產者
        • 消費者1
        • 消費者2
      • 路由模式
        • 生產者
        • 消費者1
        • 消費者2
        • 運行程序的順序
      • 通配符模式
        • 生產者
        • 消費者1
        • 消費者2
    • 持久化
      • 生產者
      • 消費者
    • Spring整合RabbitMQ
      • 生產者工程
      • 消費端工程
    • 消息成功確認機制
      • 事務機制
        • 生產者
      • Confirm發布確認機制
        • spring中使用confirm
    • 消費端限流
    • 過期時間
      • 設置隊列TTL
      • 設置消息TTL
    • 死信隊列
    • 延遲隊列
      • 生產者
      • 消費者
  • RabbitMQ集群
    • 集群搭建
    • 鏡像模式
    • HAProxy實現鏡像隊列的負載均衡
      • HAProxy簡介
      • HAProxy與Nginx
      • 安裝和配置
    • KeepAlived搭建高可用的HAProxy集群
      • KeepAlived概述
      • 安裝KeepAlived
      • 測試 ip 漂移的規則

什么是RabbitMQ

MQ(MessageQueue)消息隊列

  • 消息隊列中間件,是分布式系統中的重要組件
  • 主要解決,異步處理,應用解耦,流量削峰等問題
  • 從而實現高性能,高可用,可伸縮和最終一致性的架構
  • 使用較多的消息隊列產品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka

異步處理

  • 用戶注冊后,需要發送驗證郵箱和手機驗證碼;
  • 將注冊信息寫入數據庫,發送驗證郵件,發送手機,三個步驟全部完成后,返回給客戶端

應用解耦

  • 場景:訂單系統需要通知庫存系統
  • 如果庫存系統異常,則訂單調用庫存失敗,導致下單失敗
    • 原因:訂單系統和庫存系統耦合性太大
  • 訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶,下單成功
  • 庫存系統:訂閱下單的消息,獲取下單信息,庫存系統根據下單信息,再進行庫存操作;
  • 假如:下單的時候,庫存系統不能正常運行,也不會影響下單,因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了,實現了訂單系統和庫存系統的應用解耦;
  • 所以說,消息隊列是典型的:生產者消費者模型
  • 因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的入侵,這樣就實現了生產者和消費者的解耦

流量削峰

  • 搶購,秒殺等業務,針對高并發的場景
  • 因為流量過大,暴增會導致應用掛掉,為解決這個問題,在前端加入消息隊列
  • 用戶的請求首先寫入消息隊列,當隊列達到規定長度時,將不在接收消息的寫入,即秒殺成功的就是進入消息隊列的

背景知識

AMQP高級消息隊列協議

  • 即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議
  • 協議:數據在傳輸的過程中必須要遵守的規則
  • 基于此協議的客戶端可以與消息中間件傳遞消息
  • 并不受產品、開發語言等條件的限制

JMS

  • Java Message Server,Java消息服務應用程序接口, 一種規范,和JDBC擔任的角色類似
  • 一個Java平臺中關于面向消息中間件的API用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信

二者的聯系

  • JMS是定義了統一接口,統一消息操作;AMQP通過協議統一數據交互格式
  • JMS必須是java語言;AMQP只是協議,與語言無關

Erlang語言

  • Erlang(['?:l??])是一種通用的面向并發的編程語言,它由瑞典電信設備制造商愛立信所轄CS-Lab開發,目的是創造一種可以應對大規模并發活動的編程語言和運行環境
  • 最初是由愛立信專門為通信應用設計的,比如控制交換機或者變換協議等,因此非常適合構建分布式,實時軟并行計算系統
  • Erlang運行時環境是一個虛擬機,有點像Java的虛擬機,這樣代碼一經編譯,同樣可以隨處運行

Rabbit的優勢

  • Erlang開發,AMQP的最佳搭檔,安裝部署簡單,上手門檻低
  • 企業級消息隊列,經過大量實踐考驗的高可靠,大量成功的應用案例,例如阿里、網易等一線大廠都有使用
  • 有強大的WEB管理頁面
  • 強大的社區支持,為技術進步提供動力
  • 支持消息持久化、支持消息確認機制、靈活的任務分發機制等,支持功能非常豐富
  • 集群擴展很容易,并且可以通過增加節點實現成倍的性能提升

Rabbit組件功能

  • Broker:消息隊列服務器實體
  • Virtual Host:虛擬主機
    • 標識一批交換機、消息隊列和相關對象,形成的整體
    • 虛擬主機是共享相同的身份認證和加密環境的獨立服務器域
    • 每個vhost本質上就是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器、綁定和權
      限機制
    • vhost是AMQP概念的基礎,RabbitMQ默認的vhost是 /,必須在鏈接時指定
  • Exchange:交換器(路由)
    • 用來接收生產者發送的消息并將這些消息路由給服務器中的隊列
  • Queue:消息隊列
    • 用來保存消息直到發送給消費者。
    • 它是消息的容器,也是消息的終點
    • 一個消息可投入一個或多個隊列
    • 消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
  • Banding:綁定,用于消息隊列和交換機之間的關聯。
  • Channel:通道(信道)
    • 多路復用連接中的一條獨立的雙向數據流通道
    • 信道是建立在真實的TCP連接內的虛擬鏈接
    • AMQP命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,都是通過信道完成的
    • 因為對于操作系統來說,建立和銷毀TCP連接都是非常昂貴的開銷,所以引入了信道的概念,用來復用TCP連接。
  • Connection:網絡連接,比如一個TCP連接。
  • Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
  • Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
  • Message:消息
    • 消息是不具名的,它是由消息頭和消息體組成
    • 消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(優先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。

RabbitMQ的使用

  • 想要安裝RabbitMQ,必須先安裝erlang語言環境,類似安裝tomcat,必須先安裝J
  • 查看匹配的版本:https://www.rabbitmq.com/which-erlang.html

RabbitMQ的安裝和啟動

下載地址

  • erlang下載:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
  • socat下載:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
  • RabbitMQ下載:https://www.rabbitmq.com/install-rpm.html#downloads

安裝

[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm [root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm [root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm

啟動后臺管理插件

[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management

啟動RabbitMQ

[root@localhost opt]# systemctl start rabbitmq-server.service [root@localhost opt]# systemctl status rabbitmq-server.service [root@localhost opt]# systemctl restart rabbitmq-server.service [root@localhost opt]# systemctl stop rabbitmq-server.service

查看進程

[root@localhost opt]# ps -ef | grep rabbitmq

測試

  • 關閉防火墻:systemctl stop firewalld
  • 瀏覽器輸入:http://ip:15672
  • 默認帳號密碼:guest,guest用戶默認不允許遠程連接
    • 創建賬號\設置用戶角色\設置用戶權限\
    [root@localhost opt]# rabbitmqctl add_user ppx 123456 [root@localhost opt]# rabbitmqctl set_user_tags ppx administrator [root@localhost opt]# rabbitmqctl set_permissions -p "/" ppx".*" ".*" ".*"
    • 查看當前用戶和角色\修改密碼
    [root@localhost opt]# rabbitmqctl list_users [root@localhost opt]# rabbitmqctl change_password ppx 123123
  • 端口:
    • 5672:RabbitMQ提供給編程語言客戶端鏈接的端口
    • 15672:RabbitMQ管理界面的端口
    • 25672:RabbitMQ集群的端口

快速入門

pom依賴

RabbitMQ模式

  • RabbitMQ提供了6種消息模型,但是第6種其實是RPC,并不是MQ,因此我們只學習前5種
  • 在線手冊:https://www.rabbitmq.com/getstarted.html
  • 5種消息模型,大體分為兩類
    • p2p(點對點) :
    • 發布訂閱模式
  • 點對點模式:P2P(point to point)模式包含三個角色
    • 消息隊列(queue),發送者(sender),接收者(receiver)
    • 每個消息發送到一個特定的隊列中,接收者從中獲得消息
    • 隊列中保留這些消息,直到他們被消費或超時
    • 特點:
      • 每個消息只有一個消費者,一旦消費,消息就不在隊列中了
      • 發送者和接收者之間沒有依賴性,發送者發送完成,不管接收者是否運行,都不會影響消息發送到隊列中
      • 接收者成功接收消息之后需向對象應答成功(確認)
    • 如果希望發送的每個消息都會被成功處理,那需要P2P
  • 發布訂閱模式 :publish(Pub)/subscribe(Sub)
    • pub/sub模式包含三個角色:交換機(exchange),發布者(publisher),訂閱者(subcriber)
    • 多個發布者將消息發送交換機,系統將這些消息傳遞給多個訂閱者
    • 特點:
      • 每個消息可以有多個訂閱者
      • 發布者和訂閱者之間在時間上有依賴,對于某個交換機的訂閱者,必須創建一個訂閱后,才能消費發布者的消息
      • 為了消費消息,訂閱者必須保持運行狀態;類似于,看電視直播
    • 如果希望發送的消息被多個消費者處理,可采用發布訂閱模式

簡單模式

RabbitMQ是一個消息代理:它接收和轉發消息。你可以把它想象成一個郵局:當你把你想要寄的郵件放到一個郵箱里,你可以確定郵遞員先生或女士最終會把郵件送到你的收件人那里。在這個類比中,RabbitMQ是一個郵箱、一個郵局和一個郵遞員

  • RabbitMQ本身只是接收,存儲和轉發消息,并不會對信息進行處理!

生產者

public class Sender {public static void main(String[] args) throws Exception {String msg = "ppx,你好";//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、創建信道Channel channel = connection.createChannel();//3、創建消息隊列/*** 參數一:隊列的名稱* 參數二:隊列中的數據是否持久化* 參數三:是否排外(是否支持擴展, 是否當前隊列只能自己用)* 參數四:是否自動刪除(當隊列的連接數為0時,隊列會銷毀,不管隊列中是否保存數據)* 參數五:隊列參數(沒有參數即為null)*/channel.queueDeclare("queue1",false,false,false,null);//4、向指定的隊列發送消息/*** 參數一:交換機名稱,由于當前是簡單模式也就是p2p模式 無交換機* 參數二:目標隊列的名稱* 參數三:設置消息的屬性(無消息,沒有屬性則為空)* 參數四:消息的內容*/channel.basicPublish("","queue1",null,msg.getBytes());System.out.println("發送了"+msg);//5、釋放資源channel.close();connection.close();} }

消費者

public class Recer {public static void main(String[] args) throws Exception {//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、獲得信道Channel channel = connection.createChannel();//3、從信道中獲得消息數據DefaultConsumer consumer = new DefaultConsumer(channel){@Override//交付處理(收件人信息,請求頭(包裹上的快遞標簽),協議的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body就是隊列中獲取的字節數組(消息)String str = new String(body);System.out.println("接收到:"+str);}};//4、監聽隊列 (true 自動消息確認 ) //一直在監聽channel.basicConsume("queue1",true,consumer);} }

消息確認機制ACK

  • 通過剛才的案例可以看出,消息一旦被消費,消息就會立刻從隊列中移除
  • RabbitMQ如何得知消息被消費者接收?
    • 如果消費者接收消息后,還沒執行操作就拋異常宕機導致消費失敗,但是RabbitMQ無從得知,這樣消息就丟失了
    • 因此,RabbitMQ有一個ACK機制,當消費者獲取消息后,會向RabbitMQ發送回執ACK,告知消息已經被接收
    • ACK:(Acknowledge character)即是確認字符,在數據通信中,接收站發給發送站的一種傳輸類控制字符。表示發來的數據已確認接收無誤我們在使用http請求時,http的狀態碼200就是告訴我們服務器執行成功
    • 這種回執ACK分為兩種情況
      • 自動ACK:消息接收后,消費者立刻自動發送ACK(快遞放在快遞柜)
      • 手動ACK:消息接收后,不會發送ACK,需要手動調用(快遞必須本人簽收)
    • 兩種情況如何選擇,需要看消息的重要性:
      • 如果消息不太重要,丟失也沒有影響,自動ACK會比較方便
      • 如果消息非常重要,最好消費完成手動ACK,如果自動ACK消費后,RabbitMQ就會把消息從隊列中刪除,如果此時消費者拋異常宕機,那么消息就永久丟失了
  • 修改手動消息確認
// false:手動消息確認 channel.basicConsume("queue1", false, consumer);

public class RecerAck {public static void main(String[] args) throws Exception {//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、獲得信道Channel channel = connection.createChannel();//3、從信道中獲得消息數據DefaultConsumer consumer = new DefaultConsumer(channel){@Override//交付處理(收件人信息,請求頭(包裹上的快遞標簽),協議的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body就是隊列中獲取的字節數組(消息)String str = new String(body);System.out.println("接收到:"+str);//手動確認(收件人信息 是否同時確認多個消息 )channel.basicAck(envelope.getDeliveryTag(),false);}};//4、監聽隊列 (true 自動消息確認 ) //一直在監聽channel.basicConsume("queue1",false,consumer);} }

工作隊列模式

  • 按簡單模式來說,一個消費者來處理消息,如果生產者生產消息過快過多,而消費者的能
    力有限,就會產生消息在隊列中堆積(生活中的滯銷)

生產者

public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("queue2",false,false,false,null);for (int i = 0;i<=100;i++){String msg = "ppx,你好"+i;channel.basicPublish("","queue2",null,msg.getBytes());System.out.println("發送了"+msg);}channel.close();connection.close();} }

消費者1

ublic class Recer1 {static int i = 1;public static void main(String[] args) throws Exception {//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、獲得信道Channel channel = connection.createChannel();//此方法有雙重作用,如果隊列不存在就創建一個隊列,如果存在就獲取隊列channel.queueDeclare("queue2",false,false,false,null);//3、從信道中獲得消息數據DefaultConsumer consumer = new DefaultConsumer(channel){@Override//交付處理(收件人信息,請求頭(包裹上的快遞標簽),協議的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body就是隊列中獲取的字節數組(消息)String str = new String(body);System.out.println("【一號】接收到:"+str+"總共接收了["+i+++"]個");//模擬網絡延遲try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}//手動確認(收件人信息 是否同時確認多個消息 )channel.basicAck(envelope.getDeliveryTag(),false);}};//4、監聽隊列 (true 自動消息確認 ) //一直在監聽channel.basicConsume("queue2",false,consumer);} }

消費者2

public class Recer2 {static int i = 1;public static void main(String[] args) throws Exception {//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、獲得信道Channel channel = connection.createChannel();//此方法有雙重作用,如果隊列不存在就創建一個隊列,如果存在就獲取隊列channel.queueDeclare("queue2",false,false,false,null);//3、從信道中獲得消息數據DefaultConsumer consumer = new DefaultConsumer(channel){@Override//交付處理(收件人信息,請求頭(包裹上的快遞標簽),協議的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body就是隊列中獲取的字節數組(消息)String str = new String(body);System.out.println("【二號】接收到:"+str+"總共接收了["+i+++"]個");//模擬網絡延遲try {Thread.sleep(900);} catch (InterruptedException e) {e.printStackTrace();}//手動確認(收件人信息 是否同時確認多個消息 )channel.basicAck(envelope.getDeliveryTag(),false);}};//4、監聽隊列 (true 自動消息確認 ) //一直在監聽channel.basicConsume("queue2",false,consumer);} }

測試

  • 先運行兩個消費者,再運行生產者,消費者需要能開啟隊列,如果有隊列則獲取,如果沒有則創建,如果不聲明隊列會報異常
  • 會發現雖然兩個消費者消費速度不一致,但是消費數量一致

能者多勞原則

引用自官網
您可能已經注意到分派仍然不能完全按照我們的要求工作。例如,如果有兩個員工,當所有奇怪的消息都很重,甚至消息都很輕時,一個員工會一直很忙,而另一個人幾乎什么工作都不做。好吧,RabbitMQ對此一無所知,它仍然會均勻地分派消息。 這是因為RabbitMQ只在消息進入隊列時發送消息。它不查看用戶未確認消息的數量。它只是盲目地將每條第n個消息分派給第n個消費者。 為了克服這個問題,我們可以使用設置為prefetchCount = 1的basicQos方法。這告訴RabbitMQ一次不要給一個worker發送一條以上的消息。或者,換句話說,在worker處理并確認前一個消息之前,不要向它發送新消息。相反,它將把它分派到下一個不繁忙的worker。

  • 能者多勞必須要配合手動的ACK機制才生效
// 可以理解為:快遞一個一個送,送完一個再送下一個,速度快的送件就多 channel.basicQos(1);

面試題

  • 如何避免消息堆積
    • 采用workqueue模式,多個消費者監聽同一個隊列
    • 接到消息后,通過線程池,異步消費

發布訂閱模式

在上一篇教程中,我們創建了一個工作隊列。工作隊列背后的假設是,每個任務都被準確地交付給一個工作者。在這一部分中,我們將做一些完全不同的事情——將消息傳遞給多個消費者。此模式稱為“發布/訂閱”。
為了演示這個模式,我們將構建一個簡單的日志記錄系統。它將由兩個程序組成——第一個將發送日志消息,第二個將接收和打印它們。在我們的日志系統中,接收程序的每一個正在運行的副本都將獲得消息。這樣我們就可以運行一個接收器并將日志指向磁盤;與此同時,我們可以運行另一個接收器并在屏幕上看到日志。基本上,發布的日志消息將廣播到所有接收方。

  • 如同生活中的抖音快手訂閱號等
  • x可以代表視頻up主,紅色可以視為粉絲隊列,binding視為關注
  • p生產者發送消息給x路由,x將信息轉發給綁定的x隊列
  • X隊列將信息通過信道發送給消費者,從而進行消費
  • 整個過程,必須先創建路由
    • 路由在生產者程序中創建
    • 因為路由沒有存儲消息的能力,當生產者將信息發送給路由后,消費者還沒有運行,所以沒有隊列,路由并不知道將信息發送給誰
    • 運行程序的順序
    • MessageSender(開啟路由)
    • MessageReceiver1和MessageReceiver2(待接收消息)
    • MessageSender (發送消息)

生產者

public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明路由(路由名 路由類型)//fanout:不處理路由鍵 只需要將隊列綁定到路由上 發送到路由的消息都會被轉發到與該路由綁定到所有的隊列上channel.exchangeDeclare("test_exchange_fanout","fanout"); //隊列由消費者聲明,因為不確定有幾個隊列 //channel.queueDeclare("ps_queue",false,false,false,null);String msg = "ppx,你好";//此模式路由鍵鍵名為空channel.basicPublish("test_exchange_fanout","",null,msg.getBytes());System.out.println("生產者:"+msg);channel.close();connection.close();} }

消費者1

public class Recer1 {public static void main(String[] args) throws Exception {//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、獲得信道Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare("test_exchange_fanout_queue1",false,false,false,null);//綁定路由 關注博主 (隊列名、路由名、路由鍵)channel.queueBind("test_exchange_fanout_queue1","test_exchange_fanout","");//3、從信道中獲得消息數據DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String str = new String(body);System.out.println("消費者1:"+str);}};channel.basicConsume("test_exchange_fanout_queue1",true,consumer);} }

消費者2

public class Recer2 {public static void main(String[] args) throws Exception {//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、獲得信道Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare("test_exchange_fanout_queue2",false,false,false,null);//綁定路由 關注博主channel.queueBind("test_exchange_fanout_queue2","test_exchange_fanout","");//3、從信道中獲得消息數據DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String str = new String(body);System.out.println("消費者2:"+str);}};channel.basicConsume("test_exchange_fanout_queue2",true,consumer);} }

路由模式

  • 路由模式和發布訂閱模式類似,區別是會定向發送
  • 可以理解為是快遞公司的分揀中心,整個小區,東面的樓小張送貨,西面的樓小王送貨

生產者

public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明路由(路由名 路由類型)//direct:根據路由鍵進行定向分發消息channel.exchangeDeclare("test_exchange_direct","direct"); // channel.queueDeclare("ps_queue",false,false,false,null);String msg = "[用戶注冊],[userId = 1001]";//insert 路由鍵鍵名channel.basicPublish("test_exchange_direct","select",null,msg.getBytes());System.out.println("【用戶系統】:"+msg);channel.close();connection.close();} }

消費者1

public class Recer1 {public static void main(String[] args) throws Exception {//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、獲得信道Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare("test_exchange_direct_queue1",false,false,false,null);//綁定路由 關注博主 如果路由鍵時添加刪除修改的話 綁定到此隊列上 給定對應的路由鍵,只有路由鍵和發送者發送消息一致才會接收到消息channel.queueBind("test_exchange_direct_queue1","test_exchange_direct","insert");channel.queueBind("test_exchange_direct_queue1","test_exchange_direct","update");channel.queueBind("test_exchange_direct_queue1","test_exchange_direct","delete");//3、從信道中獲得消息數據DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String str = new String(body);System.out.println("消費者1:"+str);}};channel.basicConsume("test_exchange_direct_queue1",true,consumer);} }

消費者2

public class Recer2 {public static void main(String[] args) throws Exception {//1、獲得鏈接Connection connection = ConnectionUtil.getConnection();//2、獲得信道Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare("test_exchange_direct_queue2",false,false,false,null);//綁定路由 關注博主channel.queueBind("test_exchange_direct_queue2","test_exchange_direct","select");//3、從信道中獲得消息數據DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String str = new String(body);System.out.println("消費者1:"+str);}};channel.basicConsume("test_exchange_direct_queue2",true,consumer);} }

運行程序的順序

  • 先運行一次sender(創建路由器),
  • 有了路由器之后,在創建兩個Recer1和Recer2,進行隊列綁定
  • 再次運行sender,發出消息

通配符模式

  • 和路由模式幾乎一樣,路由模式定向匹配,此模式模糊匹配
  • 匹配符號
    • *:只能匹配一個詞(正好一個詞,多一個不行,少一個也不行)
    • #:匹配0個或更多個詞
  • 官網案例
    • Q1綁定了路由鍵 .orange. Q2綁定了路由鍵 ..rabbit 和 lazy.#
    • 下面生產者的消息會被發送給哪個隊列?
    quick.orange.rabbit # Q1 Q2 lazy.orange.elephant # Q1 Q2 quick.orange.fox # Q1 lazy.brown.fox # Q2 lazy.pink.rabbit # Q2 quick.brown.fox # 無 orange # 無 quick.orange.male.rabbit # 無

生產者

public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明路由(路由名,路由類型,持久化)// topic:模糊匹配的定向分發channel.exchangeDeclare("test_exchange_topic", "topic", true);String msg = "商品降價";channel.basicPublish("test_exchange_topic", "product.price", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());System.out.println("[用戶系統]:" + msg);channel.close();connection.close();} }

消費者1

public class Recer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列( 第二個參數為true:支持持久化)channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);// 綁定路由(綁定 用戶相關 的消息)channel.queueBind("test_exchange_topic_queue_1", "test_exchange_topic", "user.#");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【消費者1】 = " + s);}};// 4.監聽隊列 true:自動消息確認channel.basicConsume("test_exchange_topic_queue_1", true,consumer);} }

消費者2

public class Recer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare("test_transaction_queue",false,false,false,null);// 綁定路由(綁定 商品和訂單相關 的消息)channel.queueBind("test_transaction_queue", "test_transaction", "product.#");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【消費者】 = " + s);}};channel.basicConsume("test_transaction_queue", true,consumer);} }

持久化

  • 消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丟失?
    • 消費者的ACK確認機制,可以防止消費者丟失消息
    • 萬一在消費者消費之前,RabbitMQ服務器宕機了,那消息也會丟失
  • 想要將消息持久化,那么 路由和隊列都要持久化 才可以

生產者

// 聲明路由(路由名,路由類型,持久化) channel.exchangeDeclare("test_exchange_topic", "topic",true); // 發送消息(第三個參數作用是讓消息持久化) channel.basicPublish("test_exchange_topic", "product.price", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

消費者

// 聲明隊列( 第二個參數為true:支持持久化) channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null)

Spring整合RabbitMQ

  • 五種消息模型,在企業中應用最廣泛的就是最后一種:定向匹配topic
  • Spring AMQP 是基于 Spring 框架的AMQP消息解決方案,提供模板化的發送和接收消息的抽象層,提供基于消息驅動的 POJO的消息監聽等,簡化了我們對于RabbitMQ相關程序的開發。

生產者工程

  • 依賴
<dependencies><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency></dependencies>
  • spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--配置連接--><rabbit:connection-factory id="connectionFactory"host="192.168.227.128"port="5672"username="ppx"password="123456"virtual-host="/lagou"/><!--配置隊列--><rabbit:queue name="test_spring_queue_1"></rabbit:queue><!--配置rabbitAdmin:主要用于在Java代碼中對隊列的管理,用來創建,綁定,刪除隊列與交換機,發送消息等--><rabbit:admin connection-factory="connectionFactory"/><!--配置交換機 topic--><rabbit:topic-exchange name="spring_topic_exchange"><rabbit:bindings><!--綁定隊列--><rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--配置json轉換工具--><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/><!--配置rabbit模板接收發送消息--><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_topic_exchange"message-converter="jsonMessageConverter"/> </beans>
  • 發消息
public class Sender {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-produce.xml");//從容器中獲取rabbit模板對象RabbitTemplate template = context.getBean(RabbitTemplate.class);//3、發消息HashMap<String, String> map = new HashMap<String, String>();map.put("name","皮皮瀟123");map.put("phone","123465"); // template.convertAndSend("msg.user",map);template.convertAndSend("msg.user",map);System.out.println("消息已發出");context.close();} }

消費端工程

  • 依賴
與生產端一致
  • spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsd"><!--配置連接--><rabbit:connection-factory id="connectionFactory" host="192.168.227.128" port="5672" username="ppx" password="123456" virtual-host="/lagou"/><!--配置隊列--><rabbit:queue name="test_spring_queue_1"></rabbit:queue><!--配置rabbitAdmin:主要用于在Java代碼中對隊列的管理,用來創建,綁定,刪除隊列與交換機,發送消息等--><rabbit:admin connection-factory="connectionFactory"/><!--注解掃描--><context:component-scan base-package="listener"/><!--配置監聽--><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="conListener" queue-names="test_spring_queue_1"/></rabbit:listener-container> </beans>
  • MessageListener接口用于spring容器接收到消息后處理消息
  • 如果需要使用自己定義的類型 來實現 處理消息時,必須實現該接口,并重寫onMessage()方法
  • 當spring容器接收消息后,會自動交由onMessage進行處理
  • 處理消息類
/*** 監聽消費者*/ @Component public class ConsumerListener implements MessageListener {//jackson提供序列化和反序列化中使用最多的類 ,用來轉換jsonprivate static final ObjectMapper MAPPER = new ObjectMapper();@Override//message是生產者發送過來的msgpublic void onMessage(Message message) {//將message對象轉換成jsontry {JsonNode jsonNode = MAPPER.readTree(message.getBody());String name = jsonNode.get("name").asText();String phone = jsonNode.get("phone").asText();System.out.println("從隊列中獲取的name:"+name+" phone:"+phone);} catch (IOException e) {e.printStackTrace();}} }
  • 啟動項目
public class TestRunner {public static void main(String[] args) throws IOException {//獲得容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-produce.xml");//讓程序一直運行System.in.read();} }

消息成功確認機制

  • 在實際場景下,有的生產者發送的消息是必須保證成功發送到消息隊列中,那么如何保證成功投遞呢
    • 事務機制
    • 發布確認機制

事務機制

  • AMQP協議提供的一種保證消息成功投遞的方式,通過信道開啟 transactional 模式
  • 利用信道 的三個方法來實現以事務方式 發送消息,若發送失敗,通過異常處理回滾事務,確保消息成功投遞
    • channel.txSelect(): 開啟事務
    • channel.txCommit() :提交事務
    • channel.txRollback() :回滾事務

生產者

public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("test_transaction", "topic");channel.txSelect();//開啟事務try {channel.basicPublish("test_transaction", "product.price", null, "商品1降價".getBytes());int i = 1/0;channel.basicPublish("test_transaction", "product.price", null, "商品2降價".getBytes());System.out.println("生產者發送了");channel.txCommit();//事務提交} catch (IOException e) {System.out.println("數據全部撤銷");channel.txRollback();e.printStackTrace();} finally {channel.close();connection.close();}} }

Confirm發布確認機制

  • RabbitMQ為了保證消息的成功投遞,采用通過AMQP協議層面為我們提供事務機制的方案,但是采用事務會大大降低消息的吞吐量
  • 開啟事務性能最大損失超過250倍
  • 事務效率為什么會這么低呢?試想一下:10條消息,前9條成功,如果第10條失敗,那么9條消息要全部撤銷回滾。太太太浪費
  • 而confirm模式則采用補發第10條的措施來完成10條消息的送達

spring中使用confirm

  • spring-rabbitmq-producer.xml
<!--1.配置連接,啟動生產者確認機制: publisher-confirms="true"--><rabbit:connection-factory id="connectionFactory"host="192.168.227.128"port="5672"username="ppx"password="123456"virtual-host="/lagou"publisher-confirms="true"/> <!--6.配置rabbitmq的模版,添加確認回調處理類:confirm- callback="msgSendConfirmCallback"--><!--配置rabbit模板接收發送消息--><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_topic_exchange"message-converter="jsonMessageConverter"confirm-callback="messageConfirm"/> <!--確認機制的處理類--><bean id="messageConfirm" class="confirm.MessageConfirm"/>
  • 確認機制的處理類
public class MessageConfirm implements RabbitTemplate.ConfirmCallback{@Override/*** 參數一:消息相關的數據對象(封裝了消息的唯一id)* 參數二:消息確認是否成功* 參數三:異常信息(出錯咧才有的)*/public void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("消息確認發送成功");}else {System.out.println("消息確認發送失敗");System.out.println(s);//如果本條消息一定要發送到隊列中 例如下訂單 ,我們可以采用補發//1、采用遞歸(限制遞歸的次數)//2、redis+定時任務(jdk的timer,或者定時任務框架)}} }
  • 發送消息測試
public class Sender {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-produce.xml");//從容器中獲取rabbit模板對象RabbitTemplate template = context.getBean(RabbitTemplate.class);//3、發消息HashMap<String, String> map = new HashMap<String, String>();map.put("name","皮皮瀟123");map.put("phone","123465"); // template.convertAndSend("msg.user",map);//指定發送給的路由 為了報錯 因為沒有此路由lalala template.convertAndSend("lalala","msg.user",map);System.out.println("消息已發出");context.close();} }
  • 結果
消息確認發送失敗

消費端限流

  • 我們 Rabbitmq 服務器積壓了成千上萬條未處理的消息,然后隨便打開一個消費者客戶端,就會出現這樣的情況: 巨量的消息瞬間全部噴涌推送過來,但是單個客戶端無法同時處理這么多數據,就會被壓垮崩潰
  • 所以,當數據量特別大的時候,我們對生產端限流肯定是不科學的,因為有時候并發量就是特別大,有時候并發量又特別少,這是用戶的行為,我們是無法約束的
  • 我們應該對消費端限流,用于保持消費端的穩定
  • RabbitMQ 提供了一種 Qos (Quality of Service,服務質量)服務質量保證功能
    • 即在非自動確認消息的前提下,如果一定數目的消息未被確認前,不再進行消費新的消息
  • 數據準備 生產者發送消息使堆積
for(int i = 1;i<=10;i++) { rabbitTemplate.convertAndSend("msg.user", map); System.out.println("消息已發出..."); }
  • 堆積的消息
  • 消費者進行限流處理
<!--配置監聽--><!-- prefetch="3" 一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,一旦有 N 個消息還沒有ack,則該 consumer 將阻塞,直到消息被ack--><!--acknowledge="manual"手動確認--><rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual"><rabbit:listener ref="conListener" queue-names="test_spring_queue_1"/></rabbit:listener-container> // AbstractAdaptableMessageListener用于在spring容器接收到消息后用于處理消息的抽象 基類 @Component public class ConListener extends AbstractAdaptableMessageListener {//jackson提供序列化和反序列化中使用最多的類 ,用來轉換jsonprivate static final ObjectMapper MAPPER = new ObjectMapper();@Override//處理消息類public void onMessage(Message message, Channel channel) throws Exception {//將message對象轉換成jsontry {JsonNode jsonNode = MAPPER.readTree(message.getBody());String name = jsonNode.get("name").asText();String phone = jsonNode.get("phone").asText();System.out.println("從隊列中獲取的name:"+name+" phone:"+phone);//手動確認消息/*** 參數一:RabbitMQ向該channel投遞的這條消息的唯一標識id,此id是單調遞增正整數* 參數二:為了減少網絡流量,手動確認可以被批量處理,當該參數為true時,可以批量處理,可以一次性確認小于等于msgId值的所有消息* 如果id為7,則可以一次性處理7條消息*/long msgId = message.getMessageProperties().getDeliveryTag();channel.basicAck(msgId,true);Thread.sleep(3000);System.out.println("休息三秒后再繼續接收消息");} catch (IOException e) {e.printStackTrace();}} }

過期時間

  • Time To Live:生存時間、還能活多久,單位毫秒
  • 在這個周期內,消息可以被消費者正常消費,超過這個時間,則自動刪除(其實是被稱為dead message并投入到死信隊列,無法消費該消息)
  • RabbitMQ可以對消息和隊列設置TTL
    • 通過隊列設置,隊列中所有消息都有相同的過期時間
    • 對消息單獨設置,每條消息的TTL可以不同(更顆粒化)

設置隊列TTL

  • spring-rabbitmq-producer.xml
<!--2.重新配置一個隊列,同時,對隊列中的消息設置過期時間--> <rabbit:queue name="test_spring_queue_ttl" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value-type="long" value="5000"></entry> </rabbit:queue-arguments> </rabbit:queue>

設置消息TTL

  • 設置某條消息的ttl,只需要在創建發送消息時指定即可
public class Sender2 {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-produce.xml");RabbitTemplate template = context.getBean(RabbitTemplate.class); // 創建消息配置對象MessageProperties messageProperties = new MessageProperties(); // 設置消息過期時間messageProperties.setExpiration("6000"); // 創建消息Message message = new Message("6秒后自動刪除".getBytes(), messageProperties);template.convertAndSend("msg.user", message);System.out.println("消息已發出");context.close();} }
  • 注意:如果同時設置了queue和message的TTL值,則二者中較小的才會起作用

死信隊列

  • DLX(Dead Letter Exchanges)死信交換機/死信郵箱,當消息在隊列中由于某些原因沒有被及時消費而變成死信(dead message)后,這些消息就會被分發到DLX交換機中,而綁定DLX交換機的隊列,稱之為:“死信隊列”
  • 消息沒有被及時消費的原因
    • 消息被拒絕(basic.reject/ basic.nack)并且不再重新投遞 requeue=false
    • 消息超時未消費
    • 達到最大隊列長度
  • spring-rabbitmq-producer-dlx.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--配置連接--><rabbit:connection-factory id="connectionFactory"host="192.168.227.128"port="5672"username="ppx"password="123456"virtual-host="/lagou"/><!--配置rabbitAdmin:主要用于在Java代碼中對隊列的管理,用來創建,綁定,刪除隊列與交換機,發送消息等--><rabbit:admin connection-factory="connectionFactory"/><!--配置rabbit模板接收發送消息--><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory" exchange="my_exchange"/><!--定義死信隊列--><rabbit:queue name="dlx_queue"/><!--定義定向死信交換機--><rabbit:direct-exchange name="dlx_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="dlx_queue"></rabbit:binding><rabbit:binding key="dlx_max" queue="dlx_queue"></rabbit:binding></rabbit:bindings></rabbit:direct-exchange><!--聲明定向的此時消息的交換機--><rabbit:direct-exchange name="my_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="test_ttl_queue"></rabbit:binding><rabbit:binding key="dlx_max" queue="test-max-queue"></rabbit:binding></rabbit:bindings></rabbit:direct-exchange><!--聲明 測試過期的消息隊列--><rabbit:queue name="test_ttl_queue"><rabbit:queue-arguments><!--設置隊列的ttl--><entry key="x-message-ttl" value-type="long" value="6000"/><!--如果消息超時 將消息投遞給私信交換機--><entry key="x-dead-letter-exchange" value="dlx_exchange"/></rabbit:queue-arguments></rabbit:queue><!--聲明 測試超出長度的消息隊列--><rabbit:queue name="test-max-queue"><rabbit:queue-arguments><!--設置隊列的ttl--><entry key="x-max-length" value-type="long" value="2"/><!--如果隊列超過隊列長度 將消息投遞給私信交換機--><entry key="x-dead-letter-exchange" value="dlx_exchange"/></rabbit:queue-arguments></rabbit:queue> </beans>
  • 發消息測試
public class SenderDLx {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-dlx.xml");RabbitTemplate template = context.getBean(RabbitTemplate.class); // template.convertAndSend("dlx_ttl","測試超時".getBytes());/* template.convertAndSend("dlx_max","測試超長".getBytes());template.convertAndSend("dlx_max","測試超長".getBytes());template.convertAndSend("dlx_max","測試超長".getBytes());*/template.convertAndSend("dlx_ttl","超時,關閉訂單".getBytes());context.close();} }

延遲隊列

  • 延遲隊列:TTL + 死信隊列的合體
  • 死信隊列只是一種特殊的隊列,里面的消息仍然可以消費
  • 在電商開發部分中,都會涉及到延時關閉訂單,此時延遲隊列正好可以解決這個問題

生產者

  • 沿用上面死信隊列案例的超時測試,超時時間改為訂單關閉時間即可

消費者

<!-- 監聽死信隊列 --> <rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual"> <rabbit:listener ref="consumerListener" queue-names="dlx_queue" /> </rabbit:listener-container>

RabbitMQ集群

  • rabbitmq有3種模式,但集群模式是2種。詳細如下:
    • 單一模式:即單機情況不做集群,就單獨運行一個rabbitmq而已。之前我們一直在用
    • 普通模式:默認模式,以兩個節點(A、B)為例來進行說明
      • 當消息進入A節點的Queue后,consumer從B節點消費時,RabbitMQ會在A和B之間創建臨時通道進行消息傳輸,把A中的消息實體取出并經過通過交給B發送給consumer
      • 當A故障后,B就無法取到A節點中未消費的消息實體
        • 如果做了消息持久化,那么得等A節點恢復,然后才可被消費
        • 如果沒有持久化的話,就會產生消息丟失的現象
    • 鏡像模式:非常經典的 mirror 鏡像模式,保證 100% 數據不丟失
      • 高可靠性解決方案,主要就是實現數據的同步,一般來講是 2 - 3 個節點實現數據同步
      • 對于 100% 數據可靠性解決方案,一般是采用 3 個節點
      • 在實際工作中也是用得最多的,并且實現非常的簡單,一般互聯網大廠都會構建這種鏡像集群模式

集群搭建

  • 準備兩臺linux服務器,安裝好RabbitMQ
  • 集群步驟如下:
    • 修改/etc/hosts映射文件
    • 一號服務器
    127.0.0.1 A localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 A localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.204.141 A 192.168.204.142 B
    • 二號服務器
    127.0.0.1 B localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 B localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.204.141 A 192.168.204.142 B
    • 相互通信,cookie必須保持一致,同步 rabbitmq的cookie 文件:跨服務器拷貝 .erlang.cookie(隱藏文件,使用 ls -all 顯示)
    scp /var/lib/rabbitmq/.erlang.cookie 192.168.204.142:/var/lib/rabbitmq
    • 修改cookie文件,要重啟服務器,reboot
    • 停止防火墻,啟動rabbitmq服務
    systemctl stop firewalld systemctl start rabbitmq-server
    • 加入集群節點
    [root@B ~]# rabbitmqctl stop_app [root@B ~]# rabbitmqctl join_cluster rabbit@A [root@B ~]# rabbitmqctl start_app
    • 查看節點狀態
    [root@B ~]# rabbitmqctl cluster_status
    • 查看管理端
      • 搭建集群結構之后,之前創建的交換機、隊列、用戶都屬于單一結構,在新的集群環境中是不能用的
      • 所以在新的集群中重新手動添加用戶即可(任意節點添加,所有節點共享)
    [root@A ~]# rabbitmqctl add_user ppx 123123 [root@A ~]# rabbitmqctl set_user_tags ppx administrator [root@A ~]# rabbitmqctl set_permissions -p "/" ppx ".*" ".*" ".*"
    • 注意:當節點脫離集群還原成單一結構后,交換機,隊列和用戶等數據 都會重新回來
  • 此時,集群搭建完畢,但是默認采用的模式“普通模式”,可靠性不高

鏡像模式

  • 將所有隊列設置為鏡像隊列,即隊列會被復制到各個節點,各個節點狀態一致
    • 語法:set_policy {name} {pattern} {definition}
      • name:策略名,可自定義

      • pattern:隊列的匹配模式(正則表達式)

        • "^" 可以使用正則表達式,比如"^queue_"表示對隊列名稱以“queue_”開頭的所有隊列進行鏡像,而"^"表示匹配所有的隊列
      • definition :鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode

        • ha-mode:(High Available,高可用)模式,指明鏡像隊列的模式,有效值all/exactly/nodes,當前策略模式為 all,即復制到所有節點,包含新增節點
        all:表示在集群中所有的節點上進行鏡像 exactly:表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定 nodes:表示在指定的節點上進行鏡像,節點名稱通過ha-params指定
        • ha-params:ha-mode模式需要用到的參數
        • ha-sync-mode:進行隊列中消息的同步方式,有效值為automatic(自動)和manual(手動)
[root@A ~]# rabbitmqctl set_policy xall "^" '{"ha-mode":"all"}
  • 通過管理端設置鏡像策略

HAProxy實現鏡像隊列的負載均衡

  • 雖然我們在程序中訪問A服務器,可以實現消息的同步,雖然在同步,但都是A服務器在接收消息,A太累
  • 是否可以想Nginx一樣,做負載均衡,A和B輪流接收消息,再鏡像同步

HAProxy簡介

  • HA(High Available,高可用),Proxy(代理)
  • HAProxy是一款提供高可用性,負載均衡,并且基于TCP和HTTP應用的代理軟件
  • HAProxy完全免費
  • HAProxy可以支持數以萬計的并發連接
  • HAProxy可以簡單又安全的整合進架構中,同時還保護web服務器不被暴露到網絡上

HAProxy與Nginx

  • OSI:(Open System Interconnection:開放式系統互聯,是把網絡通信的工作分為7層,分別是物理層,數據鏈路層,網絡層,傳輸層,會話層,表示層和應用層)
  • Nginx的優點
    • 工作在OSI第7層,可以針對http應用做一些分流的策略
    • Nginx對網絡的依賴非常小,理論上能ping通就就能進行負載功能,屹立至今的絕對優勢
    • Nginx安裝和配置比較簡單,測試起來比較方便;
    • Nginx不僅僅是一款優秀的負載均衡器/反向代理軟件,它同時也是功能強大的Web應用服務器
  • HAProxy的優點:
    • 工作在網絡4層和7層,支持TCP與Http協議,
    • 僅僅就只是一款負載均衡軟件;單純從效率上來講HAProxy更會比Nginx有更出色的負載均衡速度,在并發處理上也是優于Nginx的
    • 支持8種負載均衡策略 ,支持心跳檢測
  • 性能上HA勝,功能性和便利性上Nginx勝
  • 對于Http協議,Haproxy處理效率比Nginx高。所以,沒有特殊要求的時候或者一般場景,建議使用Haproxy來做Http協議負載
  • 但如果是Web應用,那么建議使用Nginx!

安裝和配置

HAProxy下載:http://www.haproxy.org/download/1.8/src/haproxy-1.8.12.tar.gz

  • 解壓
[root@localhost opt]# tar -zxvf haproxy-1.8.12.tar.gz
  • make時需要使用 TARGET 指定內核及版本
[root@localhost opt]# uname -r 3.10.0-514.6.2.el7.x86_64

  • 進入目錄,編譯和安裝
[root@localhost opt]# cd haproxy-1.8.12 [root@localhost haproxy-1.8.12]# make TARGET=linux2628 PREFIX=/usr/local/haproxy [root@localhost haproxy-1.8.12]# make install PREFIX=/usr/local/haproxy
  • 安裝成功后,查看版本
[root@localhost haproxy-1.8.12]# /usr/local/haproxy/sbin/haproxy -v
  • 配置啟動文件,復制haproxy文件到/usr/sbin下 ,復制haproxy腳本,到/etc/init.d下
[root@localhost haproxy-1.8.12]# cp /usr/local/haproxy/sbin/haproxy/usr/sbin/ [root@localhost haproxy-1.8.12]# cp ./examples/haproxy.init /etc/init.d/haproxy [root@localhost haproxy-1.8.12]# chmod 755 /etc/init.d/haproxy
  • 創建系統賬號
[root@localhost haproxy-1.8.12]# useradd -r haproxy
  • haproxy.cfg配置文件需要自行創建
[root@localhost haproxy-1.8.12]# mkdir /etc/haproxy [root@localhost haproxy-1.8.12]# vim /etc/haproxy/haproxy.cfg
  • 添加配置信息到haproxy.cfg
# 全局配置 global# 設置日志log 127.0.0.1 local0 info# 當前工作目錄chroot /usr/local/haproxy# 用戶與用戶組user haproxygroup haproxy# 運行進程 IDuid 99gid 99# 守護進程啟動daemon# 最大連接數maxconn 4096# 默認配置 defaults# 應用全局的日志配置log global# 默認的模式 mode {tcp|http|health},TCP 是 4 層,HTTP 是 7 層,health 只返回 OKmode tcp# 日志類別 tcplogoption tcplog# 不記錄健康檢查日志信息option dontlognull# 3 次失敗則認為服務不可用retries 3# 每個進程可用的最大連接數maxconn 2000# 連接超時timeout connect 5s# 客戶端超時 30 秒,ha 就會發起重新連接timeout client 30s# 服務端超時 15 秒,ha 就會發起重新連接timeout server 15s# 綁定配置 listen rabbitmq_clusterbind 192.168.227.130:5672# 配置 TCP 模式mode tcp# 簡單的輪詢balance roundrobin# RabbitMQ 集群節點配置,每隔 5 秒對 mq 集群做檢查,2 次正確證明服務可用,3 次失敗證明服務不可用server A 192.168.227.128:5672 check inter 5000 rise 2 fall 3server B 192.168.227.129:5672 check inter 5000 rise 2 fall 3# haproxy 監控頁面地址 listen monitorbind 192.168.227.130:8100mode httpoption httplogstats enable# 監控頁面地址 http://192.168.227.130:8100/monitorstats uri /monitorstats refresh 5s
  • 啟動HAProxy
[root@localhost haproxy]# service haproxy start
  • 訪問監控中心:http://192.168.227.130:8100/monitor
  • 記得關閉防火墻: systemctl stop firewalld
  • 項目發消息,只需要將服務器地址修改為143即可(haproxy),其余不變
  • 所有的請求都會交給HAProxy,其負載均衡給每個rabbitmq服務器

KeepAlived搭建高可用的HAProxy集群

  • 現在的最后一個問題暴露出來了,如果HAProxy服務器宕機,rabbitmq服務器就不可用了。所以我們需要對HAProxy也要做高可用的集群

KeepAlived概述

  • Keepalived是Linux下一個輕量級別的高可用熱備解決方案
  • Keepalived的作用是檢測服務器的狀態,它根據TCP/IP參考模型的第三、第四層、第五層交換機制檢測每個服務節點的狀態,如果有一臺web服務器宕機,或工作出現故障,Keepalived將檢測到,并將有故障的服務器從系統中剔除,同時使用其他服務器代替該服務器的工作,當服務器工作正常后Keepalived自動將服務器加入到服務器群中,這些工作全部自動完成,不需要人工干涉,需要人工做的只是修復故障的服務器。
  • keepalived基于vrrp(Virtual Router Redundancy Protocol,虛擬路由冗余協議)協議,vrrp它是一種主備(主機和備用機)模式的協議,通過VRRP可以在網絡發生故障時透明的進行設備切換而不影響主機之間的數據通信
  • 兩臺主機之間生成一個虛擬的ip,我們稱漂移ip,漂移ip由主服務器承擔,一但主服務器宕機,備份服務器就會搶奪漂移ip,繼續工作,有效的解決了群集中的單點故障
  • 說白了,將多臺路由器設備虛擬成一個設備,對外提供統一ip

安裝KeepAlived

  • 修改hosts文件的地址映射
ip用途主機名
192.168.227.130KeepAlived HAProxyC
192.168.227.131KeepAlived HAProxyD
  • 安裝 keepalived
[root@C ~]# yum install -y keepalived
  • 修改配置文件(內容大改,不如刪掉,重新創建)
[root@C ~]# rm -rf /etc/keepalived/keepalived.conf [root@C ~]# vim /etc/keepalived/keepalived.conf ! Configuration File for keepalivedglobal_defs {# 非常重要,標識本機的 hostnamerouter_id C }vrrp_script chk_haproxy {# 執行的腳本位置script "/etc/keepalived/haproxy_check.sh"# 檢測時間間隔interval 2# 如果條件成立則權重減 20weight -20 }vrrp_instance VI_1 {# 非常重要,標識主機,備用機 131 改為 BACKUPstate MASTER# 非常重要,網卡名(ifconfig 查看)interface ens33# 非常重要,自定義,虛擬路由 ID 號(主備節點要相同)virtual_router_id 66# 優先級(0-254),一般主機的大于備機priority 100# 主備信息發送間隔,兩個節點必須一致,默認 1 秒advert_int 1# 認證匹配,設置認證類型和密碼,MASTER 和 BACKUP 必須使用相同的密碼才能正常通信authentication {auth_type PASSauth_pass 1111}track_script {# 檢查 haproxy 健康狀況的腳本chk_haproxy}# 簡稱 “VIP”virtual_ipaddress {# 非常重要,虛擬 ip,可以指定多個,以后連接 mq 就用這個虛擬ip192.168.227.66/24} } # 虛擬 ip 的詳細配置 virtual_server 192.168.227.66 5672 {# 健康檢查間隔,單位為秒delay_loop 6# lvs 調度算法 rr|wrr|lc|wlc|lblc|sh|dhlb_algo rr# 負載均衡轉發規則。一般包括 DR, NAT, TUN 3 種lb_kind NAT# 轉發協議,有 TCP 和 UDP 兩種,一般用 TCPprotocol TCP# 本機的真實 ipreal_server 192.168.227.130 5672 {# 默認為 1, 失效為 0weight 1} }
  • 創建執行腳本 /etc/keepalived/haproxy_check.sh
#!/bin/bash COUNT=`ps -C haproxy --no-header |wc -l` if [ $COUNT -eq 0 ];then/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfgsleep 2if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];thenkillall keepalivedfi fi
  • Keepalived 組之間的心跳檢查并不能察覺到 HAproxy 負載是否正常,所以需要使用此腳本。在 Keepalived 主機上,開啟此腳本檢測 HAproxy 是否正常工作,如正常工作,記錄日志。如進程不存在,則嘗試重啟 HAproxy ,2 秒后檢測,如果還沒有,則關掉主機的 Keepalived ,此時備 Keepalived 檢測到主 Keepalived 掛掉,接管 VIP,繼續服務。

  • 授權,否則不能執行

[root@C etc]# chmod +x /etc/keepalived/haproxy_check.sh
  • 啟動keepalived(兩臺都啟動)
[root@C etc]# systemctl stop firewalld [root@C etc]# service keepalived start | stop | status | restart
  • 查看狀態
[root@C etc]# ps -ef | grep haproxy [root@C etc]# ps -ef | grep keepalived
  • 查看ip情況 ip addr 或 ip a
[root@C etc]# ip a 啟動 keepalived 前的情況: [root@C keepalived]# ip a 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope host valid_lft forever preferred_lft forever 2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000link/ether 00:0c:29:ac:93:50 brd ff:ff:ff:ff:ff:ffinet 192.168.227.130/24 brd 192.168.186.255 scope global ens33valid_lft forever preferred_lft foreverinet6 fe80::20c:29ff:feac:9350/64 scope link valid_lft forever preferred_lft forever 啟動后的情況 [root@C keepalived]# ip a 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope host valid_lft forever preferred_lft forever 2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000link/ether 00:0c:29:ac:93:50 brd ff:ff:ff:ff:ff:ffinet 192.168.227.130/24 brd 192.168.227.255 scope global ens33valid_lft forever preferred_lft foreverinet 192.168.227.66/24 scope global secondary ens33valid_lft forever preferred_lft foreverinet6 fe80::20c:29ff:feac:9350/64 scope link valid_lft forever preferred_lft forever
  • 可以看到 ens33 網卡還多綁定了一個 IP 地址。
  • 此時,安裝完畢,按照上面的步驟就可以安裝第二臺了(服務器hostname和ip注意要修改)
  • 測試vip+端口是否提供服務(在128,A服務器上測試)
[root@A ~]# curl 192.168.227.66:5672 AMQP ## 正常提供AMQP服務,表示通過vip訪問mq服務正常
  • 測試項目發消息
消費者或生產者 -- 漂移 IP 66 --> KeepAlived 服務 --> [HAProxy 服務器C 130, HAProxy 服務器D 131]HAProxy 服務器C 130 -- 負載均衡 --> [MQ 服務器A 128, MQ 服務器B 129] HAProxy 服務器D 130 -- 負載均衡 --> [MQ 服務器A 128, MQ 服務器B 129]
  • 測試單個 RabbitMQ 服務器:將服務器地址修改為 192.168.227.128,其余不變。
  • 測試 HAProxy 實現多個 RabbitMQ 服務器負載均衡:將服務器地址修改為 192.168.227.130,其余不變。
  • 測試 KeepAlived 實現的高可用的 HAProxy 集群:將服務器地址修改為 KeepAlived 的虛擬 IP 192.168.227.66,其余不變。

測試 ip 漂移的規則

  • 查看虛擬ip ip addr 或 ip a
  • 目前,C節點是主機,所以虛擬ip在C節點
  • 停止C的keepalived,虛擬ip漂移到D節點
  • 重新啟動C節點keepalived,虛擬ip依舊在D節點,并不會由于C的回歸而回歸
  • 停止D的keepalived,虛擬ip再漂移回C節點

總結

以上是生活随笔為你收集整理的【分布式核心技术】RabbitMQ技术入门的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。