日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

发布订阅之fanout

發(fā)布時(shí)間:2024/4/13 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 发布订阅之fanout 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

訂閱模型分類

在之前的模式中,我們創(chuàng)建了一個(gè)工作隊(duì)列。 工作隊(duì)列背后的假設(shè)是:每個(gè)任務(wù)只被傳遞給一個(gè)工作人員。 在這一部分,我們將做一些完全不同的事情 - 我們將會(huì)傳遞一個(gè)信息給多個(gè)消費(fèi)者。 這種模式被稱為“發(fā)布/訂閱”。

訂閱模型示意圖:

解讀:

1、1個(gè)生產(chǎn)者,多個(gè)消費(fèi)者

2、每一個(gè)消費(fèi)者都有自己的一個(gè)隊(duì)列

3、生產(chǎn)者沒有將消息直接發(fā)送到隊(duì)列,而是發(fā)送到了交換機(jī)

4、每個(gè)隊(duì)列都要綁定到交換機(jī)

5、生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī)到達(dá)隊(duì)列,實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者獲取的目的

X(Exchanges):交換機(jī)一方面:接收生產(chǎn)者發(fā)送的消息。另一方面:知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。

Exchange類型有以下幾種:

Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列Direct:定向,把消息交給符合指定routing key 的隊(duì)列 Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列

我們這里先學(xué)習(xí)

Fanout:即廣播模式

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

訂閱模型-Fanout

Fanout,也稱為廣播。

流程圖:

在廣播模式下,消息發(fā)送流程是這樣的:

  • 1) 可以有多個(gè)消費(fèi)者

  • 2) 每個(gè)消費(fèi)者有自己的queue(隊(duì)列)

  • 3) 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))

  • 4) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定。

  • 5) 交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列

  • 6) 隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)

生產(chǎn)者

兩個(gè)變化:

  • 1) 聲明Exchange,不再聲明Queue

  • 2) 發(fā)送消息到Exchange,不再發(fā)送到Queue

public class Send {private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息內(nèi)容String message = "Hello everyone";// 發(fā)布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [生產(chǎn)者] Sent '" + message + "'");channel.close();connection.close();} }

消費(fèi)者1

public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_1";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊(duì)列到交換機(jī)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定義隊(duì)列的消費(fèi)者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費(fèi)者1] received : " + msg + "!");}};// 監(jiān)聽隊(duì)列,自動(dòng)返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} }

要注意代碼中:隊(duì)列需要和交換機(jī)綁定

消費(fèi)者2

public class Recv2 {private final static String QUEUE_NAME = "fanout_exchange_queue_2";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊(duì)列到交換機(jī)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定義隊(duì)列的消費(fèi)者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費(fèi)者2] received : " + msg + "!");}};// 監(jiān)聽隊(duì)列,手動(dòng)返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} }

測試

我們運(yùn)行兩個(gè)消費(fèi)者,然后發(fā)送1條消息:

?

總結(jié)

以上是生活随笔為你收集整理的发布订阅之fanout的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。