4 交换机-fanout(订阅发布模式)
目錄
- 訂閱發布模式
- 1、交換器(Exchange)
- 1.1、創建交換器
- 1.2 、推送消息到交換器
- 2、臨時隊列
- 3、綁定(bingdings)
- 5、代碼例子
- 5.1、生產者代碼示例
- 5.2、消費者代碼示例
- 1、交換器(Exchange)
訂閱發布模式
1、交換器(Exchange)
在Work Queue背后,其實是rabbitMQ把每條任務消息只發給一個消費者。本篇中我們將要研究如何把一條消息推送給多個消費者,這種模式被稱為publish/subscribe(發布/訂閱)
RabbitMQ的消息發送模型核心思想是生產者不直接把消息發送到消息隊列中。事實上,生產者不知道自己的消息將會被緩存到哪個隊列中。
其實生產者者可以把消息發送到exchange(消息交換機)上。exchange是一個很簡單的事情,它一邊接收生產者的消息,另一邊再把消息推送到消息隊列中。Exchange必須知道在它接收到一條消息應該怎么去處理。應該把這條消息推送到指定的消息隊列中?還是把消息推送到所有的隊列中?或是把消息丟掉?這些規則都可以用exchange類型來定義
1.1、創建交換器
有一些可用的exchange類型:direct, topic, headers和fanout。這里我們主要看最后一個:fanout,這里我們創建一個名字為logs、類型為fanout的exchange:
channel.exchangeDeclare("logs", "fanout");fanout類型的exchange是很簡單的。就是它把它能接收到的所有消息廣播到它知道的所有隊列中。
- 沒有名字的exchange
如上面的代碼我們沒有指定exchagne的名字,采用的是“”,空字符串的符號指的是默認的或沒有命名的exchange:消息會根據routingKey被路由到指定的消息隊列中
// 申明交換器,第一個參數:交換器的名字;第二個參數:交換器的類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");1.2 、推送消息到交換器
現在我們來把消息推送到已命名的exchange上,原來的做法是推送到默認的交換器上面的;
- 原來的做法
- 推送到交換器
2、臨時隊列
之前的例子中,應該會發現我們都是使用了一個指定名字的消息隊列。對應的生產者和消費者之間都要使用相同的消息隊列名稱。
但是在我們的log系統中卻不是這樣,我們希望能夠接收到所有的log消息,不只是其中的一部分。我們只要處理當前的log消息,不用管過去的歷史log。為了實現,我們需要做以下兩步:
- 無論什么時候我們和RabbitMQ建立連接時,我們都要刷新、清空Queue。為了達到這一的目的,我們可以用一個隨機的名字(隨機性可由自己來定義)來創建Queue,也可以讓服務器來自動建立一個隨見的Queue。
- 當消費者斷開連接時,Queue能自動被刪除。
使用java客戶端時,我們使用無參數的queueDeclare方法,就可以創建一個已經生成名字的、排他性、會自動刪除的Queue
String queueName = channel.queueDeclare().getQueue();這里面我們就可以拿到一個隨機名字的queue,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg
3、綁定(bingdings)
現在已經創建好了一個fanout類型的exchange和一個隊列。那么接下來我們就需要讓exchange向我們的queue里發送消息,Exchange和queue之間的關系就是綁定(bindings)
channel.queueBind(queueName,exchangeName,"");5、代碼例子
現在的代碼和之前的區別不是很大;
主要的區別就是:
- 我們把消息推送到一個命名的exchange上,而不是之前未命名的默認exchange
- 在我們發送消息時需要提供一個routingKey,但對于fanout類型的exchange可以忽略
5.1、生產者代碼示例
/*** @author zhaodi* @description* @date 2018/9/28 16:50*/ public class Producer {private static final String EXCHANGE_NAME = "my-exchange-1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MqConnectionUtil.getConnection();Channel channel = connection.createChannel();// 申明交換器,// 第一個參數:交換器的名字;// 第二個參數:交換器的類型channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 第一個參數:交換器名稱;// 第二個參數:隊列名稱;// 第三個參數:消息屬性;// 第四個參數:消息體channel.basicPublish(EXCHANGE_NAME,"",null,"i am message".getBytes());channel.close();connection.close(); }正如你所見,在建立連接后我們聲明了exchange。這一步是必須的,因為禁止向一個不存在的exchange推送消息。
如果沒有對exchange負責的queue,那么消息將會被丟失,這是沒有問題的;如果沒有消費者監聽的話,我們會安全的丟掉這些消息。
5.2、消費者代碼示例
/*** @author zhaodi* @desc 發布訂閱模式*/ public class Consumer {private static final String EXCHANGE_NAME="my-exchange-1";public static void main(String[] args) throws IOException {Connection connection = MqConnectionUtil.getConnection();Channel channel = connection.createChannel();// 申明消息路由的名稱和類型channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 申明一個隨機的消息隊列名稱String queueName = channel.queueDeclare().getQueue();// 綁定消息路由和消息隊列channel.queueBind(queueName,EXCHANGE_NAME,"");// 創建消費者com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("c1--->:"+new String(body));// 手動應答// 第一個參數:消息標志channel.basicAck(envelope.getDeliveryTag(),false);}};// 監聽,關閉自動應答boolean autoAck = false;channel.basicConsume(queueName,autoAck,consumer);} }轉載于:https://www.cnblogs.com/zhaod/p/11391258.html
總結
以上是生活随笔為你收集整理的4 交换机-fanout(订阅发布模式)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 从零写一个编译器(十三):代码生成之遍历
- 下一篇: 5 交换机-direct (路由)