日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

发布订阅之topics

發(fā)布時間:2024/4/13 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 发布订阅之topics 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

訂閱模型-Topic

Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

通配符規(guī)則:

`#`:匹配一個或多個詞`*`:匹配不多不少恰好1個詞

舉例:

`audit.#`:能夠匹配`audit.irs.corporate` 或者 `audit.irs``audit.*`:只能匹配`audit.irs`

在這個例子中,我們將發(fā)送所有描述動物的消息。消息將使用由三個字(兩個點)組成的routing key發(fā)送。路由關(guān)鍵字中的第一個單詞將描述速度,第二個顏色和第三個種類:“<speed>.<color>.<species>”。

我們創(chuàng)建了三個綁定:Q1綁定了綁定鍵“* .orange.”,Q2綁定了“.*.rabbit”和“l(fā)azy.#”。

Q1匹配所有的橙色動物。

Q2匹配關(guān)于兔子以及懶惰動物的消息。

?

練習(xí),生產(chǎn)者發(fā)送如下消息,會進入那個隊列:

quick.orange.rabbit Q1 Q2

lazy.orange.elephant

quick.orange.fox

lazy.pink.rabbit

quick.brown.fox

quick.orange.male.rabbit

orange

?

生產(chǎn)者

使用topic類型的Exchange,發(fā)送消息的routing key有3種: item.isnert、item.update、item.delete:

public class Send {private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為topicchannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息內(nèi)容String message = "新增商品 : id = 1001";// 發(fā)送消息,并且指定routing key 為:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());System.out.println(" [商品服務(wù):] Sent '" + message + "'");channel.close();connection.close();} }

消費者1

我們此處假設(shè)消費者1只接收兩種類型的消息:更新商品和刪除商品

public class Recv {private final static String QUEUE_NAME = "topic_exchange_queue_1";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。需要 update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監(jiān)聽,如果有消息的時候,會被自動調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者1] received : " + msg + "!");}};// 監(jiān)聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }

消費者2

我們此處假設(shè)消費者2接收所有類型的消息:新增商品,更新商品和刪除商品。

/*** 消費者2*/ public class Recv2 {private final static String QUEUE_NAME = "topic_exchange_queue_2";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監(jiān)聽,如果有消息的時候,會被自動調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者2] received : " + msg + "!");}};// 監(jiān)聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }

?

總結(jié)

以上是生活随笔為你收集整理的发布订阅之topics的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。