日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

rabbitmq+topic+java_译:5.RabbitMQ Java Client 之 Topics (主题)

發布時間:2025/3/12 java 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq+topic+java_译:5.RabbitMQ Java Client 之 Topics (主题) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我們使用的是direct(直接交換),而不是使用只能進行虛擬廣播的 fanout(扇出交換),并且有可能選擇性地接收日志。

雖然使用direct(直接交換)改進了我們的系統,但它仍然有局限性 - 它不能基于多個標準進行路由。

在我們的日志系統中,我們可能不僅要根據嚴重性訂閱日志,還要根據發出日志的源來訂閱日志。您可能從syslogunix工具中了解這個概念,該工具根據嚴重性(info / warn / crit ...)和facility(auth / cron / kern ...)來路由日志。

這會給我們帶來很大的靈活性 - 我們可能想聽聽來自'cron'的關鍵錯誤以及來自'kern'的所有日志。

要在我們的日志系統中實現這一點,我們需要了解更復雜的topic (主題交換)。

Topic exchange 主題交換

發送到主題交換的消息不能具有任意routing_key- 它必須是由點分隔的單詞列表。單詞可以是任何內容,但通常它們指定與消息相關的一些功能。一些有效的路由密鑰示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由密鑰中可以包含任意數量的單詞,最多可達255個字節。

綁定密鑰也必須采用相同的形式。主題交換背后的邏輯類似于直接交換- 使用特定路由密鑰發送的消息將被傳遞到與匹配綁定密鑰綁定的所有隊列。但是,綁定鍵有兩個重要的特殊情況:

*(星號)可以替代一個單詞。

#(hash)可以替換零個或多個單詞。

在一個例子中解釋這個是最容易的:

在這個例子中,我們將發送所有描述動物的消息。消息將與包含三個單詞(兩個點)的路由鍵一起發送。

路由鍵中的第一個單詞將描述速度,第二個是顏色,第三個是物種:“。。”。

我們創建了三個綁定:Q1綁定了綁定鍵“* .orange。*”,Q2綁定了“*。*。rabbit”和“lazy。#”。

這些綁定可以概括為:

Q1對所有橙色動物感興趣。

Q2希望聽到關于兔子的一切,以及關于懶惰動物的一切。

路由密鑰設置為“quick.orange.rabbit”的消息將傳遞到兩個隊列。消息“lazy.orange.elephant”也將同時發送給他們。另一方面,“quick.orange.fox”只會進入第一個隊列,而“lazy.brown.fox”只會進入第二個隊列。“lazy.pink.rabbit”將僅傳遞到第二個隊列一次,即使它匹配兩個綁定。“quick.brown.fox”與任何綁定都不匹配,因此它將被丟棄。

如果我們違反合同并發送帶有一個或四個單詞的消息,例如“orange”或“quick.orange.male.rabbit”,會發生什么?好吧,這些消息將不匹配任何綁定,將丟失。

另一方面,“lazy.orange.male.rabbit”,即使它有四個單詞,也會匹配最后一個綁定,并將被傳遞到第二個隊列。

主題交流

主題交換功能強大,可以像其他交易所一樣。

當隊列與“#”(哈希)綁定密鑰綁定時 - 它將接收所有消息,而不管路由密鑰 - 如扇出交換。

當特殊字符“*”(星號)和“#”(哈希)未在綁定中使用時,主題交換的行為就像直接交換一樣

把它們放在一起

我們將在日志記錄系統中使用主題交換。我們將首先假設日志的路由鍵有兩個詞:“。”。

EmitLogTopic.java

importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;public classEmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static voidmain(String[] argv) {

Connection connection= null;

Channel channel= null;try{

ConnectionFactory factory= newConnectionFactory();

factory.setHost("localhost");

connection=factory.newConnection();

channel=connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String routingKey=getRouting(argv);

String message=getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey,null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

}catch(Exception e) {

e.printStackTrace();

}finally{if (connection != null) {try{

connection.close();

}catch(Exception ignore) {

}

}

}

}private staticString getRouting(String[] strings) {if (strings.length < 1)return "anonymous.info";return strings[0];

}private staticString getMessage(String[] strings) {if (strings.length < 2)return "Hello World!";return joinStrings(strings, " ", 1);

}private static String joinStrings(String[] strings, String delimiter, intstartIndex) {int length =strings.length;if (length == 0)return "";if (length

StringBuilder words= newStringBuilder(strings[startIndex]);for (int i = startIndex + 1; i < length; i++) {

words.append(delimiter).append(strings[i]);

}returnwords.toString();

}

}

ReceiveLogsTopic.java

importjava.io.IOException;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.Consumer;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;public classReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throwsException {

ConnectionFactory factory= newConnectionFactory();

factory.setHost("localhost");

Connection connection=factory.newConnection();

Channel channel=connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String queueName=channel.queueDeclare().getQueue();if (argv.length < 1) {

System.err.println("Usage: ReceiveLogsTopic [binding_key]...");

System.exit(1);

}for(String bindingKey : argv) {

channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer= newDefaultConsumer(channel) {

@Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties,byte[] body) throwsIOException {

String message= new String(body, "UTF-8");

System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");

}

};

channel.basicConsume(queueName,true, consumer);

}

}

總結

以上是生活随笔為你收集整理的rabbitmq+topic+java_译:5.RabbitMQ Java Client 之 Topics (主题)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。