日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ教程大全看这一篇就够了-java版本

發布時間:2023/12/29 编程问答 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ教程大全看这一篇就够了-java版本 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

什么是RabbitMQ?

RabbitMQ 核心概念

Docker 安裝 RabbitMQ?

RabbitMQ 控制臺頁面介紹

RabbitMQ 交換機?Exchange 介紹

Direct Exchange 定向、直連交換機

Fanout Exchange 發布/訂閱、廣播、扇形交換機

Topic Exchange 主題、通配符交換機

Headers Exchanges(少用)

RabbitMQ 代碼 Java版

1:簡單隊列

2:工作隊列?Work Queues?

3:Fanout 發布/訂閱 交換機模式

4:Routing 路由 交換機模式

5:Topic?主題 交換機模式

SpringBoot 整合 RabbitMQ

RabbitMQ 消息可靠性投遞

生產者到交換機 開啟ACK確認可靠消息投遞

交換機到隊列 可靠消息投遞

RabbitMQ 消息確認消費ACK

消息的可靠消費

SpringBoot 配置 RabbitMQ 廣播 發布/訂閱

RabbitMQ TTL死信隊列

什么是 TTL?

什么是死信隊列?

什么是死信交換機?

消息什么情況下會成為死信消息?

如何設置消息的TTL存活時間?

RabbitMQ 控制臺 操作 死信隊列綁定死信交換機

RabbitMQ 延遲隊列

什么是延遲隊列?

定時消息的使用場景

RabbitMQ 實現延遲消息

SpringBoot 實現延遲隊列

RabbitMQ 的集群環境

RabbitMQ 普通集群模式的介紹

RabbitMQ 搭建普通集群環境

1:準備節點環境

2:節點配置成集群

3:SpringBoot 整合 RabbitMQ 普通集群

RabbitMQ 鏡像集群模式的介紹(推薦)

策略policy介紹


官網:https://www.rabbitmq.com/

什么是RabbitMQ?

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、C、用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不錯,與SpringAMQP完美的整合、API豐富易用

RabbitMQ 核心概念

  • Broker

    RabbitMQ的服務端程序,可以認為一個mq節點就是一個broker。

  • Producer 生產者

    創建消息Message,然后發布到RabbitMQ隊列中

  • Consumer 消費者

    消費隊列中的消息

  • Message 消息

    生產消費的內容,有消息頭和消息體,也包括多個屬性配置,比如routingKey路由鍵

  • Queue 隊列

    是RabbitMQ的內部對象,用于存儲消息,消息都只能存儲在隊列中

  • Channel 信道

    一條支持多路復用的通道,獨立的雙向數據流通道,可以發布、訂閱、接收消息。信道是建立在真實的TCP連接內的虛擬連接,復用TCP連接的通道

  • Connection 連接

    是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯,一個連接上可以有多個channel進行通信

  • Exchange 交換器

    生產者將消息發送到Exchange,交換器將消息路由到一個或者多個隊列中,隊列和交換機是多對多的關系。

  • RoutingKey 路由鍵

    生產者將消息發給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則

    最大長度255 字節

  • Binding 綁定

    通過綁定將交換器與隊列關聯起來,在綁定的時候一般會指定一個綁定鍵 (BindingKey),這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了

  • Virtual host 虛擬主機

    用于不同業務模塊的邏輯隔離,一個Virtual Host里面可以有若干個Exchange和Queue,同一個VirtualHost里面不能有相同名稱的Exchange或Queue

    默認是 / ,可以使用 /dev /test /pro


Docker 安裝 RabbitMQ?

使用源碼安裝需要的依賴多、且版本和維護相對復雜,需要erlang環境、版本也有要求。

linux 上安裝 docker

https://github.com/docker-library/docs/tree/master/rabbitmq

docker pull rabbitmq:management? ? ? ? ? ? ?

// 拉取遠程鏡像,management 帶 后臺管理頁面的版本

docker images? ? ? ? ? ? ? ? ? ? ? ? ????????????????

// 查看本機存在的鏡像

docker run -d -h rabbitmq_1 --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management?

// 運行 docker 鏡像

參數說明:

  • run -d :? run 運行鏡像 -d 后臺運行
  • -h :自定義容器的主機名,它會被寫到容器內的 /etc/hostname 和 /etc/hosts,作為容器主機IP的別名,并且將顯示在容器的bash中
  • --name:自定義容器名稱
  • -p 15672:15672 :management 界面管理訪問端口
  • -p 5672:5672? ? ?:amqp 訪問端口
  • -e rabbitma參數

rabbitmq 訪問地址:http://ip:15672? ? ? ? ? ? ? ? // 如果訪問不了,請查看防火墻端口是否開放

rabbitmq 默認登錄賬號和密碼:guest/guest

開機自動啟動 rabbitmq

docker update 容器ID?--restart=always

rabbitma 的主要端口

4369 ????????erlang 發現口
5672 ????????client 端通信口
15672? ? ? ?管理界面 ui 端口
25672 ??????server 間內部通信口


RabbitMQ 控制臺頁面介紹

RabbitMQ控制面板介紹 - 簡書


RabbitMQ 交換機?Exchange 介紹

  • 生產者將消息發送到 Exchange,交換器將消息路由到一個或者多個隊列中,交換機有多個類型,隊列和交換機是多對多的關系。
  • 交換機只負責轉發消息,不具備存儲消息的能力,如果沒有隊列和exchange綁定,或者沒有符合的路由規則,則消息會被丟失。
  • RabbitMQ有四種交換機類型,分別是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的不常用。
  • Direct Exchange 定向、直連交換機

  • 將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配
  • 例子:如果一個隊列綁定到該交換機上要求路由鍵 “aabb”,則只有被標記為“aabb”的消息才被轉發,不會轉發aabb.cc,也不會轉發gg.aabb,只會轉發aabb
  • 處理路由健
  • Fanout Exchange 發布/訂閱、廣播、扇形交換機

  • 只需要簡單的將隊列綁定到交換機上,一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息
  • Fanout交換機轉發消息是最快的,用于發布訂閱,廣播形式,中文是扇形
  • 不處理路由健
  • Topic Exchange 主題、通配符交換機

  • 主題交換機是一種發布/訂閱的模式,結合了直連交換機與扇形交換機的特點
  • 將路由鍵和某模式進行匹配。此時隊列需要綁定在一個模式上
  • 符號“#”匹配一個或多個詞,符號“*”匹配只匹配一個詞
  • 例子:因此“abc.#”能夠匹配到“abc.def.ghi”,但是“abc.*” 只會匹配到“abc.def”
  • Headers Exchanges(少用)

  • 根據發送的消息內容中的headers屬性進行匹配, 在綁定Queue與Exchange時指定一組鍵值對
  • 當消息發送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進行匹配
  • 如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列
  • 不處理路由鍵

RabbitMQ 代碼 Java版

maven項目中依賴rabbitmq的包

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version> </dependency>

1:簡單隊列

官網教程:RabbitMQ tutorial - "Hello World!" — RabbitMQ

  • 發送消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發送消息 public class Send {// 隊列名稱private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 創建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75"); // rabbit server 所在IP地址factory.setPort(5672); // rabbit server amqp端口號factory.setUsername("guest"); // rabbit server 登錄賬號factory.setPassword("guest"); // rabbit server 登錄密碼factory.setVirtualHost("/dev"); // 指定連接到哪個虛擬主機try (// 創建連接Connection connection = factory.newConnection();// 創建信道Channel channel = connection.createChannel()) {/** queueDeclare:隊列不存在時自動創建隊列,如果存在使用存在的* 參數1:隊列名稱* 參數2:是否持久化* 參數3:是否獨占* 參數4:沒有消費者的時候是否自動刪除隊列* 參數5:其他*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息String message = "Hello World!";/** 發布消息* 參數1:交換機* 參數2:隊列* 參數3:其他額外的參數* 參數4:要發送的消息,byte[]類型*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.err.println(" [x] Sent '" + message + "'");}} }

執行代碼,可以在 rabbitmq 控制臺上看到隊列已經被創建了,并且有一條未被消費的消息

  • 消費消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;// 消費消息 public class Recv {// 隊列名稱private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.err.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};/** basicConsume:監聽隊列* 參數1:監聽的隊列名稱* 參數2:autoAck:是否在收到消息后自動確認(消費端拿到消息后,自動告訴 rabbitmq server 我已經收到消息了)* 參數3:回調,處理消息*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});// 第2個監聽隊列的方法 // Consumer consumer = new DefaultConsumer(channel) { // @Override // public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // String message = new String(body, StandardCharsets.UTF_8); // System.out.println(" [x] Received '" + message + "'"); // } // }; // channel.basicConsume(QUEUE_NAME, true, consumer);} }

?執行代碼,可以在 rabbitmq 控制臺上看到隊列的消息已經被消費了,并且可以看到 連接信息


2:工作隊列?Work Queues?

官網教程:RabbitMQ tutorial - Work Queues — RabbitMQ

例如:生產者一秒可以生產 一萬個消息,消費者一秒可以消費 一千個消息,這種情況如果只有一個消費者,消息就會堆積在隊列中。這時就需要部署多個消費者節點。

多個消費者負載均衡策略是 輪詢。

  • 兩個消費者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;// 第一個消費節點 public class Recv1 {// 隊列名稱private final static String QUEUE_NAME = "work_mq";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.err.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");/** 處理完消息后,手動確認 Ack* 參數1:消息標簽* 參數2:是否批量 Ack*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 參數2:關閉自動 ack 確認channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});} } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;// 第二個消費節點 public class Recv2 {// 隊列名稱private final static String QUEUE_NAME = "work_mq";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.err.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者2: '" + message + "'");/** 處理完消息后,手動確認 Ack* 參數1:消息標簽* 參數2:是否批量 Ack*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 參數2:關閉自動 ack 確認channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});} }
  • 一個生產者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發送消息 public class Send {// 隊列名稱private final static String QUEUE_NAME = "work_mq";public static void main(String[] argv) throws Exception {// 創建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");try (// 創建連接Connection connection = factory.newConnection();// 創建信道Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 批量發送消息for (int i = 0; i < 10; i++) {// 消息String message = "Hello Work! ___ " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.err.println(" [x] 生產者: '" + message + "'");}}} }

先啟動2個消費者監聽隊列,再啟動生產者生產消息。可以看到消息被輪詢消費

設置 多節點消費者負載均衡策略為:公平策略 (能者多勞)

Channel channel = connection.createChannel(); // 消費者設置 qos為 1, 一個消費完后繼續消費 channel.basicQos(1);


3:Fanout 發布/訂閱 交換機模式

官網教程:https://www.rabbitmq.com/tutorials/tutorial-three-python.html

作用:生產者發布消息后,所有監聽廣播類型指定交換機的的消費者都可以消費此消息。

  • 2個或多個消費者
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第一個消費節點 public class Recv1 {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 廣播類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機進行綁定,fanout交換機不需要routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} } import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第二個消費節點 public class Recv2 {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 廣播類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機進行綁定,fanout交換機不需要routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者2: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} }
  • 一個生產者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發送消息 public class Send {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {// 創建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 綁定 廣播類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 消息String message = "廣播消息。。。";// 發送消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));}} }

4:Routing 路由 交換機模式

官網教程:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

  • 隊列和交換機(Direct)綁定,需要指定一個routingKey(也叫Bingding Key)
  • 消息生產者發送消息給交換機,需要指定routingKey
  • 交換機根據消息的routingKey,轉發給對應的隊列

示例:日志收集系統

  • 一個隊列收集 error 日志
  • 一個隊列收集 全部 日志
  • 2個消費者,一個消費 error 消息,一個消費 全部 消息
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第一個隊列,消費所有消息 public class Recv1 {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_direct";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 直連類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機進行綁定,direct交換機 需要指定 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey");channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey");channel.queueBind(queueName, EXCHANGE_NAME, "debugRoutingKey");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} } import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第二個隊列,消費error消息 public class Recv2 {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_direct";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 直連類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機進行綁定,direct交換機 需要指定 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} }
  • ?一個生產者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發送消息 public class Send {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_direct";public static void main(String[] argv) throws Exception {// 創建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 綁定 廣播類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 消息String errorMsg = "error消息";String infoMsg = "info消息";String debugMsg = "debug消息";// 發送消息channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, errorMsg.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, infoMsg.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debugMsg.getBytes(StandardCharsets.UTF_8));}} }


5:Topic?主題 交換機模式

官網教程:RabbitMQ tutorial - Topics — RabbitMQ

  • Topic?可以實現發布訂閱模式Fanout 和 路由模式Direct 的功能,更加靈活,支持模式匹配,通配符等。
  • 交換機通過通配符進行轉發到對應的隊列,* 代表一個詞,#代表1個或多個詞,一般用#作為通配符居多,比如 #.order, 會匹配 info.order 、sys.error.order, 而 *.order ,只會匹配 info.order, 使用.進行分割多個詞。
  • 注意:
    • 交換機和隊列綁定時用的binding使用通配符的路由健
    • 生產者發送消息時需要使用具體的路由健

示例:日志收集系統

  • 一個隊列收集 error 日志
  • 一個隊列收集 全部 日志
  • 2個消費者,一個消費 error 消息,一個消費 全部 消息
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第一個隊列,消費所有消息 public class Recv1 {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_topic";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 主題類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機進行綁定,topic交換機 需要指定 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "*.log.*");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} } import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第二個隊列,消費error消息 public class Recv2 {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_topic";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 主題類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機進行綁定,topic交換機 需要指定 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "*.log.error");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} }
  • ?一個生產者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發送消息 public class Send {// 交換機名稱private final static String EXCHANGE_NAME = "exchange_topic";public static void main(String[] argv) throws Exception {// 創建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 綁定 主題類型 的交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 消息String errorMsg = "訂單服務 error消息";String infoMsg = "訂單服務 info消息";String debugMsg = "用戶服務 debug消息";// 發送消息channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, errorMsg.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, infoMsg.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "user.log.debug", null, debugMsg.getBytes(StandardCharsets.UTF_8));}} }


SpringBoot 整合 RabbitMQ

pom 文件中添加依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.5</version> </dependency>

application.yml 文件配置 rabbitmq

注意:1:guest 賬號只能連本機的mq服務,實際開發的時候請創建一個新的賬號。2:rabbitmq集成在maven聚合組件中,然后這個組件被其他服務依賴以此達到整合mq的方式的時候,application 文件的后綴名要是 .properties (rabbitmq 讀取不到 yml 后綴的配置)

spring:rabbitmq:host: 192.168.31.71port: 5672username: guestpassword: guestvirtual-host: /dev

配置 交換機和隊列綁定的 Bean

import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;@Component public class RabbitmqConfig {// 自定義交換機名稱public static final String EXCHANGE_NAME = "order_exchange";// 自定義隊列名稱public static final String QUEUE_NAME = "order_queue";/*** 創建 topic 交換機*/@Bean(EXCHANGE_NAME) // 多個交換機時要指定交換機的Bean名稱public Exchange orderExchange() {return ExchangeBuilder// 指定 主題類型的交換機 名稱.topicExchange(EXCHANGE_NAME)// 是否持久化.durable(true).build();}/*** 創建持久化隊列*/@Bean(QUEUE_NAME) // 多個隊列時要指定隊里的Bean名稱public Queue orderQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}/*** 隊列和交換機綁定*/@Beanpublic Binding orderBinding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {return BindingBuilder// 綁定的隊列.bind(queue)// 隊列綁定到 指定的交換機.to(exchange)// 綁定的 routingKey.with("order.#")// 沒有其他參數.noargs();} }

發送消息

import com.lxx.rabbitmq.config.RabbitmqConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {/** 發送消息* 參數1:要發送的交換機* 參數2:指定匹配的 routingKey* 參數3:要發送的消息*/rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 error消息");} }

消費者監聽隊列

import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "order_queue") // 消費者監聽這個隊列 public class OrderMqListener {/*** 監聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandlerpublic void messageHandler(String body, Message message) {System.err.println(message.getMessageProperties().getMessageId());System.err.println(message.getMessageProperties().getDeliveryTag());System.err.println(message.toString());System.err.println(" X 字符串消費者:" + body);}/*** 監聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandlerpublic void messageHandler(Integer body, Message message) {System.err.println(" X 數字消費者:" + body);} }

RabbitMQ 消息可靠性投遞

什么是消息的可靠投遞?

保證消息百分百發送到消息隊列中

如果確保消息的可靠投遞

消息生產者 需要接受到mq服務端 接受到消息的確認應答
完善的消息補償機制,發送失敗的消息可以再感知并二次處理

RabbitMQ消息投遞路徑:生產者->交換機->隊列->消費者

通過兩個的點控制消息的可靠性投遞

  • 生產者到交換機
    • 通過confirmCallback
  • 交換機到隊列
    • 通過returnCallback
  • 生產者到交換機 開啟ACK確認可靠消息投遞

appliction.yml 配置

spring:rabbitmq:# 開啟消息 confirm 二次確認publisher-confirm-type: correlated

消息監聽代碼沒變化

發送消息,代碼如下:

import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.err.println("ConfirmCallback ================");System.err.println(" ================ correlationData = " + correlationData);System.err.println(" ================ ack = " + ack);System.err.println(" ================ cause = " + cause);if (ack) {System.out.println("發送成功");// 更新數據庫消息狀態為 成功} else {System.err.println("發送失敗");// 更新數據庫消息狀態為 失敗}}});// 發送消息之前 ,數據庫新增一條消息記錄狀態,狀態是 發送 TODO// 發送消息rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 ConfirmCallback 消息");// 模擬投遞失敗 // rabbitTemplate.convertAndSend("不存在的交換機", "order.error", "訂單 ConfirmCallback消息");} }
  • 交換機到隊列 可靠消息投遞

appliction.yml 配置

spring:rabbitmq:# 開啟 交換機到 隊列publisher-returns: truetemplate:# 為true,則交換機處理消息到路由失敗后,則會返回給生產者。 或者代碼 rabbitTemplate.setMandatory(true) 是一樣的效果mandatory: true

消息監聽代碼沒變化

發送消息,代碼如下:

package com.lxx.rabbitmq;import com.lxx.rabbitmq.config.RabbitmqConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {int replyCode = returned.getReplyCode();System.err.println("ReturnsCallback ================");System.err.println(" ================ code = " + replyCode);System.err.println(" ================ returned = " + returned.toString());}});// 發送消息rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 ReturnsCallback 消息");// 模擬投遞失敗 // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "不存在的routingKey", "訂單 ReturnsCallback 消息");}}

RabbitMQ 消息確認消費ACK

消費者從broker中監聽到消息,要確保消息被正常處理。

RabbitMQ 消費者ACK介紹

  • 消費者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除
  • 消費者在處理消息出現了網絡不穩定、服務器異常等現象,那么就不會有ACK反饋,RabbitMQ會認為這個消息沒有正常消費,會將消息重新放入隊列中Ready
  • 只有當消費者正確發送ACK反饋,RabbitMQ確認收到后,消息才會從RabbitMQ服務器的數據中刪除。
  • 消息的ACK確認機制默認是開啟狀態自動ACK,消息如未被進行ACK的消息確認機制,這條消息被鎖定Unacked

  • 消息的可靠消費

appliction.yml 配置

spring:rabbitmq:listener:simple:#開啟手動確認消息,如果消息重新入隊,進行重試acknowledge-mode: manualretry:enabled: true #是否開啟消費者重試max-attempts: 5 #最大重試次數initial-interval: 5000ms #重試間隔時間(單位毫秒)max-interval: 1200000ms #重試最大時間間隔(單位毫秒)multiplier: 2 #間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設置的最大間隔時間

發送消息代碼沒變化

消息監聽,代碼如下:

import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component @RabbitListener(queues = "order_queue") public class OrderMqListener {@RabbitHandlerpublic void messageHandler(String body, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.err.println(deliveryTag);System.err.println(message.toString());System.err.println(" X 字符串消費者:" + body);// 告訴 broker 消息被正常消費 確認ACKchannel.basicAck(deliveryTag, false);/** 告訴 broker,消息被消費后 拒絕確認ACK* 參數一:deliveryTag,消息被投遞的次數* 參數二:是否批量 拒絕ACK,false 一條一條的拒絕ack* 參數上:是否重新投遞到隊列中*///channel.basicNack(deliveryTag, false, true); // 一次可以拒絕接收0個或多個//channel.basicReject(deliveryTag, true); // 一次只能拒絕接收一個消息} }

SpringBoot 配置 RabbitMQ 廣播 發布/訂閱

一個消息生產者,多個消費者節點,共同消費同一條消息

配置 廣播 交換機和隊列綁定的 Bean

import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitmqConfig {// 自定義交換機名稱public static final String EXCHANGE_NAME = "order_exchange";// 自定義隊列名稱public static final String QUEUE_NAME = "order_queue";/*** 創建 廣播 交換機*/@Bean(EXCHANGE_NAME)public FanoutExchange orderExchange() {return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();}/*** 創建持久化隊列*/@Bean(QUEUE_NAME)public Queue orderQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}/*** 隊列和交換機綁定*/@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange());} }

發送消息

import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {/** 發送消息* 參數1:要發送的交換機* 參數2:廣播不要指定路由key* 參數3:要發送的消息*/rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "", "廣播消息");} }

消費者監聽,消費者可以多節點/集群部署,多節點可以消費同一條消息

import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component;@Component public class OrderMqListener {/*** 監聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(), // 注意:此處不能指定隊列名稱。 如果指定隊列只能被一個消費者節點消費exchange = @Exchange(value = RabbitmqConfig.EXCHANGE_NAME, type = ExchangeTypes.FANOUT)))public void messageHandler(String body) {System.err.println(" X 消息 :" + body);} }


RabbitMQ TTL死信隊列

什么是 TTL?

  • time to live 消息存活時間的意思。
  • 如果消息在存活時間內未被消費,則會被清除。
  • RabbitMQ支持兩種ttl設置
    • 整個隊列進行配置ttl(居多)
    • 單獨消息進行配置ttl

什么是死信隊列?

用來存放 在存活時間內未被消費消息 的隊列? ? ? ? // 過期消息不清楚,存放在此隊列?

什么是死信交換機?

Dead Letter Exchange(死信交換機,縮寫:DLX)當消息成為死信后,會被重新發送到另一個交換機,這個交換機就是DLX死信交換機。

注意:死信隊列和死信交換機 與 普通隊列普通交換機沒區別。

消息什么情況下會成為死信消息?

  • 消費者拒收消息(basic.reject/basic.nack),并且沒有重新入隊 requeue=false
  • 消息在隊列中未被消費,且超過隊列或者消息本身的過期時間TTL(time-to-live)
  • 隊列的消息長度達到極限

成為死信的結果:如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列

如何設置消息的TTL存活時間?

方式一:隊列過期,對整個隊列消息設置統一過期時間

x-message-ttl? ? ? 單位:ms毫秒

方式二:消息過期,對單個消息進行設置

expiration??? ? ? ? ?單位:ms毫秒

注意:兩者都配置的話,時間短的先觸發。


RabbitMQ 控制臺 操作 死信隊列綁定死信交換機

-- 代碼操作和普通操作沒有不同,這里學習控制面板的操作

創建死信交換機

創建死信隊列

死信隊列和死信交換機綁定

新建普通隊列,設置隊列的過期時間。指定普通隊列對應的死信交換機

向普通隊列 里發送消息,過期后,消息路由到 死信隊列


RabbitMQ 延遲隊列

什么是延遲隊列?

一種帶有延遲功能的消息隊列,Producer 將消息發送到消息隊列 服務端,但并不期望這條消息立馬投遞,而是推遲到在當前時間點之后的某一個時間投遞到 Consumer 進行消費,該消息即定時消息。

定時消息的使用場景

  • 通過消息觸發一些定時任務,比如在某一固定時間點向用戶發送提醒消息
  • 用戶登錄之后5分鐘給用戶做分類推送、用戶多少天未登錄給用戶做召回推送;
  • 消息生產和消費有時間窗口要求:比如在天貓電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條 延時消息。這條消息將會在 30 分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應的訂單是否已完成支付。 如支付未完成,則關閉訂單。如已完成支付則忽略

RabbitMQ 實現延遲消息

RabbitMQ本身是不支持延遲隊列的。需要結合死信隊列的特性,達到延遲消息的目的。

  • 消息生產者
    • 消息投遞到普通的交換機
    • 消息過期,進入死信隊列
  • 消費消費者
    • 消費者監聽死信交換機的隊列

SpringBoot 實現延遲隊列

配置 死信交換機和死信隊列,配置普通交換機和普通隊列,配置普通隊列綁定到死信交換機

import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;@Component public class OrderTimeoutCloseConfig {// ==================================================死信隊列 start========================================================/*** 死信交換機,訂單超時關閉*/public static final String ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE = "order_timeout_close_dead_exchange";/*** 死信隊列,訂單超時關閉*/public static final String ORDER_TIMEOUT_CLOSE_DEAD_QUEUE = "order_timeout_close_dead_queue";/*** 進入死信隊列的路由key,訂單超時關閉*/public static final String ORDER_TIMEOUT_CLOSE_ROUTING_KEY = "order_timeout_close_routing_key";/*** 創建 死信 交換機*/@Bean(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE)public TopicExchange orderTimeoutCloseDeadExchange() {return ExchangeBuilder.topicExchange(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE).durable(true).build();}/*** 創建 死信 隊列*/@Bean(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE)public Queue orderTimeoutCloseDeadQueue() {return QueueBuilder.durable(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE).build();}/*** 死信隊列和死信交換機綁定*/@Beanpublic Binding deadOrderTimeoutBinding(@Qualifier(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE) Queue queue, @Qualifier(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_TIMEOUT_CLOSE_ROUTING_KEY).noargs();}/** 死信隊列和死信交換機綁定,方式二*/ // @Bean // public Binding deadOrderTimeoutBinding() { // return new Binding( // ORDER_TIMEOUT_CLOSE_DEAD_QUEUE, // Binding.DestinationType.QUEUE, // ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE, // ORDER_TIMEOUT_CLOSE_ROUTING_KEY, // null // ); // }// ==================================================死信隊列 end========================================================// ==================================================普通隊列 start========================================================/*** 普通交換機,訂單超時,用于進入死信隊列*/public static final String ORDER_TIMEOUT_INTO_DEAD_EXCHANGE = "order_timeout_into_dead_exchange";/*** 普通隊列,訂單超時,用于進入死信隊列*/public static final String ORDER_TIMEOUT_INTO_DEAD_QUEUE = "order_timeout_into_dead_queue";/*** 進入普通隊列的路由key,訂單超時關閉*/public static final String ORDER_TIMEOUT_INTO_ROUTING_KEY = "order_timeout_into_routing_key";/*** 創建 普通 交換機*/@Bean(ORDER_TIMEOUT_INTO_DEAD_EXCHANGE)public TopicExchange orderTimeoutIntoDeadExchange() {return ExchangeBuilder.topicExchange(ORDER_TIMEOUT_INTO_DEAD_EXCHANGE).durable(true).build();}/*** 創建 普通 隊列,普通隊列和死信隊列進行綁定*/@Bean(ORDER_TIMEOUT_INTO_DEAD_QUEUE)public Queue orderTimeoutIntoDeadQueue() {/* // 方式一Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE); // 要綁定的死信交換機args.put("x-dead-letter-routing-key", ORDER_TIMEOUT_CLOSE_ROUTING_KEY); // 要綁定的死信 binding keyargs.put("x-message-ttl", 10000); // 普通隊列的消息過期時間,過期后 消息進入死信隊列,單位:ms毫秒return QueueBuilder.durable(ORDER_TIMEOUT_INTO_DEAD_QUEUE).withArguments(args).build();*/// 方式二return QueueBuilder.durable(ORDER_TIMEOUT_INTO_DEAD_QUEUE)// 要綁定的死信交換機.deadLetterExchange(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE)// 要綁定的死信 binding key.deadLetterRoutingKey(ORDER_TIMEOUT_CLOSE_ROUTING_KEY)// 普通隊列的消息過期時間,過期后 消息進入死信隊列,單位:ms毫秒.ttl(10000) // 這里測試指定10秒,正式情況可以指定30分鐘.build();}/** 普通隊列和普通交換機綁定*/@Beanpublic Binding orderTimeoutBinding() {return new Binding(ORDER_TIMEOUT_INTO_DEAD_QUEUE,Binding.DestinationType.QUEUE,ORDER_TIMEOUT_INTO_DEAD_EXCHANGE,ORDER_TIMEOUT_INTO_ROUTING_KEY,null);}// ==================================================普通隊列 end======================================================== }

消費者,監聽死信隊列

import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component public class OrderTimeoutCloseMQListener {@RabbitHandler@RabbitListener(queues = OrderTimeoutCloseConfig.ORDER_TIMEOUT_CLOSE_DEAD_QUEUE) // 監聽死信隊列public void messageHandler(String body, Message message, Channel channel) throws IOException {// 1:監聽到 訂單消息,拿到訂單idSystem.err.println(" X 監聽死信隊列收到消息 body = " + body);// 2:用訂單id,查詢數據庫訂單信息,如果訂單狀態是 已支付,這里不做操作// 3:如果訂單狀態是 未支付,把訂單狀態設置成 超時未支付channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }

生產者,向普通隊列發送消息

import net.minidev.json.JSONObject; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;import java.util.HashMap; import java.util.Map;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 模擬下單成功*/@Testpublic void testBuy() {// 1:用戶下單把訂單信息存入數據庫,返回訂單id// 2:發送訂單id到 普通消息隊列Map<String, String> map = new HashMap<>();map.put("orderId", "123456789");rabbitTemplate.convertAndSend(OrderTimeoutCloseConfig.ORDER_TIMEOUT_INTO_DEAD_EXCHANGE, OrderTimeoutCloseConfig.ORDER_TIMEOUT_INTO_ROUTING_KEY, JSONObject.toJSONString(map));} }

RabbitMQ 的集群環境

RabbitMQ 普通集群模式的介紹

????????集群有 3 個節點,node1、node2、node3,三個節點有相同的元數據(交換機、隊列結構),一個消息只存在一個節點上,不在其他節點同時存在。

例如:

????????A消息,存在node1節點上。A消息在node2、node3節點上不存在。

消費者監聽node1節點可以直接消費到 A消息。假如消費者監聽的是node2節點,那么rabbitmq 會把A消息被消費的時候才從 node1 節點取出放入到node2節點,然后node2節點再把消息轉發給消費者。

問題:

? ? ? ? 1:假如node1節點故障,那么node2無法獲取node1節點上未被消費的消息。

? ? ? ? 2:如果node1持久化后發生故障,消息需要等到node1恢復正常后才可以正常消費。

? ? ? ? 3:如果node1未做持久化發生故障,那么node1節點上的消息將會丟失。

應用場景:

? ? ? ? 該模式適用于消息無需持久化的場景,例如日志傳輸隊列。

注意:集群需要保證每個節點有相同的token令牌。

消息持久化


RabbitMQ 搭建普通集群環境

1:準備節點環境

3個節點的訪問 web控制臺訪問端口分別是:15671、15672、15673

準備3個目錄,用于放 3個節點

/usr/local/rabbitmq/1 /usr/local/rabbitmq/2 /usr/local/rabbitmq/3

創建 節點1

sudo docker run -d \ --name rabbitmq_1 \ -h rabbitmq_host1 \ -p 4361:4369 \ -p 5671:5672 \ -p 15671:15672 \ -p 25671:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --privileged=true \ -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq/ \ rabbitmq:management

創建 節點2

sudo docker run -d \ --name rabbitmq_2 \ -h rabbitmq_host2 \ -p 4362:4369 \ -p 5672:5672 \ -p 15672:15672 \ -p 25672:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --link rabbitmq_1:rabbitmq_host1 \ --privileged=true \ -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq/ \ rabbitmq:management

創建 節點3

sudo docker run -d \ --name rabbitmq_3 \ -h rabbitmq_host3 \ -p 4363:4369 \ -p 5673:5672 \ -p 15673:15672 \ -p 25673:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --link rabbitmq_1:rabbitmq_host1 \ --link rabbitmq_2:rabbitmq_host2 \ --privileged=true \ -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq/ \ rabbitmq:management

參數說明:

  • -e RABBITMQ_ERLANG_COOKIE:指定集群節點的cookie,節點的cookie要配置相同。?
  • --link:容器互聯,讓容器之前可以相互ping通
  • --privileged:讓容器內部的用戶有root權限,不然用戶對容器內部的文件沒有操作權限permission denied
  • -v :讓物理機路徑與容器里的路徑映射,容器里的路徑的數據會存儲在物理機上

節點完成后,可以訪問 http://ip:端口,查看2個節點是否創建成功。

如果容器啟動失敗,可以使用 docker logs 容器id 命令查看啟動日志。


2:節點配置成集群

配置之前查看節點狀態,進入容器內,使用命令:rabbitmqctl cluster_status

配置節點1

docker exec -it 節點1的容器名稱 /bin/bash? // 進入啟動的docker容器內 rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務,rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl start_app // 啟動 rabbitmq 服務 exit // 退出 docker 容器

配置節點2,加入集群

docker exec -it 節點2的容器名稱 /bin/bash? ? ? ? // 進入啟動的docker容器內 rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務,rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl join_cluster --ram 節點1的hostname // 加入集群 --ram 參數是以內存的方式加入,不帶此參數默認是磁盤的方式,節點1的hostname是:rabbit@rabbitmq_host1 rabbitmqctl start_app // 啟動 rabbitmq 服務 exit // 退出 docker 容器

配置節點3,加入集群

docker exec -it 節點3的容器名稱 /bin/bash? ? ? ? // 進入啟動的docker容器內 rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務,rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl join_cluster --ram 節點1的hostname // 加入集群 --ram 參數是以內存的方式加入,不帶此參數默認是磁盤的方式,節點1的hostname是:rabbit@rabbitmq_host1 rabbitmqctl start_app // 啟動 rabbitmq 服務 exit // 退出 docker 容器

配置完成之后,可以在容器內使用命令:rabbitmqctl cluster_status,查看集群狀態,可以看到集群現在有3個節點在運行。1個磁盤節點,2個內存節點

訪問網頁,可以看到有3個節點

消息隊列和交換機在所有節點上存在。消息只在自己的節點上存在,當一個節點宕機后,宕機節點上的消息無法被消費(消息不可用)


3:SpringBoot 整合 RabbitMQ 普通集群

application.yml 文件配置 rabbitmq 集群地址,其他配置不變

spring:rabbitmq:listener:simple:acknowledge-mode: manualpublisher-returns: truetemplate:mandatory: truepublisher-confirm-type: correlated # host: 192.168.189.75 # port: 5672virtual-host: /devpassword: guestusername: guest### 配置節點地址addresses: 192.168.189.75:5671,192.168.189.75:5672,192.168.189.75:5673

代碼操作,和上面的單節點的一樣正常的生產監聽消息就行了,這里就不重復貼代碼了


RabbitMQ 鏡像集群模式的介紹(推薦)

????????隊列做成鏡像隊列,鏡像隊列中的消息在各個節點之間同步(A消息在各個節點中都存在)。

好處:
?? ?實現了高可用,部分節點宕機后,不影響消息的正常消費。
?? ?鏡像集群模式可以保證消息100%不丟失。適用于高可用要求高的需求,例如訂單服務。

缺點:
?? ?消息數量過多,大量的消息同步會加大網絡寬帶的消耗。節點越多服務器性能受影響越大
?? ?
注意:集群需要保證每個節點有相同的token令牌。

策略policy介紹

policy是用來控制和修改集群的vhost隊列和Exchange復制行為。哪些Exchange或者queue的數據需要復制、同步,以及如何復制同步。

  • 創建一個policy策略

路徑:進入rabbitmq控制臺 -> Admin -> Policies -> Add / update a policy

參數介紹:

Name:自定義策略名稱,建議不要使用空格

Pattern:用于匹配隊列/交換機的正則表達式,^ 符號,表示匹配所有

Apply to:應用到交換機和隊列

Priority:優先級。一個隊列/交換機只會有一個生效的 Policy,如果匹配多個 Policy,則優先級數值最大的 Policy 生效。

Definition:JSON格式的一組鍵值對,表示設置的屬性,會被注入匹配隊列/交換機

  • ha-mode:
    • all:表示在集群中所有的節點上進行鏡像同步(一般都用這個參數)
    • exactly:表示在指定個數的節點上進行鏡像同步,節點的個數由ha-params指定
    • nodes:表示在指定的節點上進行鏡像同步,節點名稱通過ha-params指定
  • ha-sync-mode:鏡像消息同步方式
    • automatic: 自動(默認)
    • manually:手動

policy策略創建完成后,鏡像隊列就配置成功了。可以看到隊列發生了如下變化

鏡像集群注意點:

  • 集群啟動順序:先啟動磁盤節點 => 再啟動內存節點
  • 集群關閉順序:先關閉內存節點 => 再關閉磁盤節點
  • 最后關閉必須是磁盤節點,否則容易造成集群啟動失敗、數據丟失等異常情況

可以看到節點宕掉一個后,消息還是存在的

總結

以上是生活随笔為你收集整理的RabbitMQ教程大全看这一篇就够了-java版本的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。