Java笔记-使用RabbitMQ的Java接口实现topic(主题模式)
目錄
?
?
基本概念
代碼與實(shí)例
?
基本概念
實(shí)現(xiàn)的就是官方給出的這個模型:
Topic exchange:將路由和某模式匹配
其中
#:匹配一個或多個
*:匹配一個
比如下面要舉得這個例子
交換機(jī)設(shè)置為topic模式,生產(chǎn)者生成的消息的路由鍵值為goods.XXXX
其中XXXX,可能為add、delete、update、modify等
隊列一綁定的是goods.add
隊列二綁定的是goods.#
這樣話,如果生產(chǎn)者生產(chǎn)一個路由鍵值為goods.add的消息,辣么2個隊列都將會收到。
如果生產(chǎn)者生成一個路由鍵值為goods.delete的消息,辣么只有1個隊列將會收到。
?
代碼與實(shí)例
當(dāng)生產(chǎn)者和消費(fèi)者跑起來后,對應(yīng)的RabbitMQ交換機(jī)如下:
可見有2個隊列,一個綁定的路由鍵為goods.add
一個綁定的路由鍵為goods.#
當(dāng)生產(chǎn)者發(fā)送的鍵值為goods.add時:
兩個消費(fèi)者都可以收到:
當(dāng)生產(chǎn)者發(fā)送的鍵值為goods.delete時:
只有消費(fèi)者二可以收到,消費(fèi)者一和以前一樣
源碼如下:
Recv1.java
package topic;import com.rabbitmq.client.*; import util.ConnectionUtils;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Recv1 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[1] Recv msg : " + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[1] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);} }Recv2.java
package topic;import com.rabbitmq.client.*; import util.ConnectionUtils;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Recv2 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[2] Recv msg : " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[2] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);} }Send.java
package topic;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import util.ConnectionUtils;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Send {private static final String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");String msgString = "goods ... ... ...";channel.basicPublish(EXCHANGE_NAME, "goods.delete", null, msgString.getBytes());System.out.println("send msg :" + msgString);channel.close();connection.close();} }ConnectionUtils.java
package util;import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class ConnectionUtils {public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/vhost_cff");factory.setUsername("cff");factory.setPassword("123");return factory.newConnection();} }源碼打包下載地址:
https://github.com/fengfanchen/Java/tree/master/TopicModel
總結(jié)
以上是生活随笔為你收集整理的Java笔记-使用RabbitMQ的Java接口实现topic(主题模式)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java笔记-编码方式创建kaptcha
- 下一篇: java美元兑换,(Java实现) 美元