日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ交换器Exchange介绍与实践

發布時間:2025/3/11 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ交换器Exchange介绍与实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

導讀

有了Rabbit的基礎知識之后(基礎知識詳見:深入解讀RabbitMQ工作原理及簡單使用),本章我們重點學習一下Rabbit里面的exchange(交換器)的知識。

交換器分類

RabbitMQ的Exchange(交換器)分為四類:

  • direct(默認)
  • headers
  • fanout
  • topic

其中headers交換器允許你匹配AMQP消息的header而非路由鍵,除此之外headers交換器和direct交換器完全一致,但性能卻很差,幾乎用不到,所以我們本文也不做講解。

**注意:**fanout、topic交換器是沒有歷史數據的,也就是說對于中途創建的隊列,獲取不到之前的消息。

1、direct交換器

direct為默認的交換器類型,也非常的簡單,如果路由鍵匹配的話,消息就投遞到相應的隊列,如圖:

使用代碼:channel.basicPublish(“”, QueueName, null, message)推送direct交換器消息到對于的隊列,空字符為默認的direct交換器,用隊列名稱當做路由鍵。

direct交換器代碼示例

發送端:

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); // 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接時是否刪除隊列;參數五:消息其他參數】 channel.queueDeclare(config.QueueName, false, false, false, null); String message = String.format("當前時間:%s", new Date().getTime()); // 推送內容【參數說明:參數一:交換機名稱;參數二:隊列名稱,參數三:消息的其他屬性-路由的headers信息;參數四:消息主體】 channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));

接收端,持續接收消息:

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); // 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接時是否刪除隊列;參數五:消息其他參數】 channel.queueDeclare(config.QueueName, false, false, false, null); Consumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "utf-8"); // 消息正文System.out.println(workName + "收到消息 => " + message);channel.basicAck(envelope.getDeliveryTag(), false); // 手動確認消息【參數說明:參數一:該消息的index;參數二:是否批量應答,true批量確認小于當前id的消息】} }; channel.basicConsume(config.QueueName, false, "", defaultConsumer);

接收端,獲取單條消息

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息確認

持續消息獲取使用:basic.consume;單個消息獲取使用:basic.get。

注意:不能使用for循環單個消息消費來替代持續消息消費,因為這樣性能很低;

消息的發后既忘特性

發后既往只的是接受者不知道消息的來源是誰發送的,如果想要指定消息的發送者,需要包含在發送內容里面,這點就像我們在信件里面注明自己的姓名一樣,只有這樣才能知道發送者是誰。

消息確認

看了上面的代碼我們可以知道,消息接收到之后必須使用channel.basicAck()方法手動確認(非自動確認刪除模式下),那么問題來了。

消息收到未確認會怎么樣?

如果應用程序接收了消息,因為bug忘記確認接收的話,消息在隊列的狀態會從“Ready”變為“Unacked”,如圖:

如果消息收到卻未確認,Rabbit將不會再給這個應用程序發送更多的消息了,這是因為Rabbit認為你沒有準備好接收下一條消息。

此條消息會一直保持Unacked的狀態,直到你確認了消息,或者斷開與Rabbit的連接,Rabbit會自動把消息改完Ready狀態,分發給其他訂閱者。

當然你可以利用這一點,讓你的程序延遲確認該消息,直到你的程序處理完相應的業務邏輯,這樣可以有效的防治Rabbit給你過多的消息,導致程序崩潰。

消息確認Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

channel.basicAck(long deliveryTag, boolean multiple)為消息確認,參數1:消息的id;參數2:是否批量應答,true批量確認小于次id的消息。

總結:消費者消費的每條消息都必須確認。

消息拒絕

消息在確認之前,可以有兩個選擇:

選擇1:斷開與Rabbit的連接,這樣Rabbit會重新把消息分派給另一個消費者;

選擇2:拒絕Rabbit發送的消息使用channel.basicReject(long deliveryTag, boolean requeue),參數1:消息的id;參數2:處理消息的方式,如果是true,Rabbib會重新分配這個消息給其他訂閱者,如果設置成false的話,Rabbit會把消息發送到一個特殊的“死信”隊列,用來存放被拒絕而不重新放入隊列的消息。

消息拒絕Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒絕

2、fanout交換器——發布/訂閱模式

fanout有別于direct交換器,fanout是一種發布/訂閱模式的交換器,當你發送一條消息的時候,交換器會把消息廣播到所有附加到這個交換器的隊列上。

比如用戶上傳了自己的頭像,這個時候圖片需要清除緩存,同時用戶應該得到積分獎勵,你可以把這兩個隊列綁定到圖片上傳的交換器上,這樣當有第三個、第四個上傳完圖片需要處理的需求的時候,原來的代碼可以不變,只需要添加一個訂閱消息即可,這樣發送方和消費者的代碼完全解耦,并可以輕而易舉的添加新功能了。

和direct交換器不同,我們在發送消息的時候新增channel.exchangeDeclare(ExchangeName, “fanout”),這行代碼聲明fanout交換器。

發送端:

final String ExchangeName = "fanoutec"; // 交換器名稱 Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器 String message = "時間:" + new Date().getTime(); channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));

接受消息不同于direct,我們需要聲明fanout路由器,并使用默認的隊列綁定到fanout交換器上。

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器 String queueName = channel.queueDeclare().getQueue(); // 聲明隊列 channel.queueBind(queueName, ExchangeName, ""); Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");} }; channel.basicConsume(queueName, true, consumer);

fanout和direct的區別最多的在接收端,fanout需要綁定隊列到對應的交換器用于訂閱消息。

其中channel.queueDeclare().getQueue()為隨機隊列,Rabbit會隨機生成隊列名稱,一旦消費者斷開連接,該隊列會自動刪除。

注意:對于fanout交換器來說routingKey(路由鍵)是無效的,這個參數是被忽略的。

3、topic交換器——匹配訂閱模式

最后介紹的是topic交換器,topic交換器運行和fanout類似,但是可以更靈活的匹配自己想要訂閱的信息,這個時候routingKey路由鍵就排上用場了,使用路由鍵進行消息(規則)匹配。

假設我們現在有一個日志系統,會把所有日志級別的日志發送到交換器,warning、log、error、fatal,但我們只想處理error以上的日志,要怎么處理?這就需要使用topic路由器了。

topic路由器的關鍵在于定義路由鍵,定義routingKey名稱不能超過255字節,使用“.”作為分隔符,例如:com.mq.rabbit.error。

消費消息的時候routingKey可以使用下面字符匹配消息:

  • “*”可以匹配所有內容;
  • “#”匹配0和多個字符;

例如發布了一個“com.mq.rabbit.error”的消息:

能匹配上的路由鍵:

  • cn.mq.rabbit.*

  • cn.mq.rabbit.#

  • .error

  • cn.mq.#

  • #

不能匹配上的路由鍵:

  • cn.mq.*
  • *.error
  • *

所以如果想要訂閱所有消息,可以使用“#”匹配。

**注意:**fanout、topic交換器是沒有歷史數據的,也就是說對于中途創建的隊列,獲取不到之前的消息。

發布端:

String routingKey = "com.mq.rabbit.error"; Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器 String message = "時間:" + new Date().getTime(); channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器 String queueName = channel.queueDeclare().getQueue(); // 聲明隊列 String routingKey = "#.error"; channel.queueBind(queueName, ExchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(routingKey + "|接收消息 => " + message);} }; channel.basicConsume(queueName, true, consumer);

擴展部分—自定義線程池

如果需要更大的控制連接,用戶可自己設置線程池,代碼如下:

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es);

其實看過源碼的同學可能知道,factory.newConnection本身默認也有線程池的機制,ConnectionFactory.class部分源碼如下:

private ExecutorService sharedExecutor; public Connection newConnection() throws IOException, TimeoutException {return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort()))); } public void setSharedExecutor(ExecutorService executor) {this.sharedExecutor = executor; }

其中this.sharedExecutor就是默認的線程池,可以通過setSharedExecutor()方法設置ConnectionFactory的線程池,如果不設置則為null。

用戶如果自己設置了線程池,像本小節第一段代碼寫的那樣,那么當連接關閉的時候,不會自動關閉用戶自定義的線程池,所以用戶必須自己手動關閉,通過調用shutdown()方法,否則可能會阻止JVM的終止。

官方的建議是只有在程序出現嚴重性能瓶頸的時候,才應該考慮使用此功能。

項目地址

GitHub:https://github.com/vipstone/rabbitmq-java.git

總結

以上是生活随笔為你收集整理的RabbitMQ交换器Exchange介绍与实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。