日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ(4) TopicExchange

發布時間:2024/10/12 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ(4) TopicExchange 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

topic 是RabbitMQ中最靈活的一種方式,可以根據routing_key自由的綁定不同的隊列

生產者工程

package com.example.demo.rabbitMq.exchange.topic;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class TopicRabbitConfig {public static final String TOPIC_MESSAGE = "topic.message";public static final String TOPIC_MESSAGE_S = "topic.messages";public static final String USER_MESSAGE = "user.message";/*** 武器庫*/public static final String ARM_QUEUE = "arm.queue";@Beanpublic Queue queueTopicMessage() {return new Queue(TopicRabbitConfig.TOPIC_MESSAGE);}@Beanpublic Queue queueTopicMessages() {return new Queue(TopicRabbitConfig.TOPIC_MESSAGE_S);}@Beanpublic Queue queueUserMessage() {return new Queue(TopicRabbitConfig.USER_MESSAGE);}@Beanpublic Queue queueArm() {return new Queue(TopicRabbitConfig.ARM_QUEUE);}@BeanTopicExchange exchange() {return new TopicExchange("topicExchange");}@BeanBinding bindingExchangeMessage(Queue queueTopicMessage, TopicExchange exchange) {//所有匹配routingKey=topic.message的消息,將放入Queue[name="topic.message"]return BindingBuilder.bind(queueTopicMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueTopicMessages, TopicExchange exchange) {//所有匹配routingKey=topic.# 的消息,將放入Queue[name="topic.messages"]return BindingBuilder.bind(queueTopicMessages).to(exchange).with("topic.#");}@BeanBinding bindingExchangeUserMessage(Queue queueUserMessage, TopicExchange exchange) {///所有匹配routingKey=user.# 的消息,將放入Queue[name="user.messages"]return BindingBuilder.bind(queueUserMessage).to(exchange).with("user.#");}@BeanBinding bindingExchangeArm(Queue queueArm, TopicExchange exchange) {return BindingBuilder.bind(queueArm).to(exchange).with("arm.#");} }

?

發送消息

package com.example.demo.rabbitMq.exchange.topic;import com.example.demo.dto.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send1() {User user = new User();user.setUserName("Sender1.....");user.setMobile("1111111111");rabbitTemplate.convertAndSend("topicExchange","topic.message",user);}public void send2() {User user = new User();user.setUserName("Sender2.....");user.setMobile("2222222");rabbitTemplate.convertAndSend("topicExchange","topic.messages",user);}public void send3() {User user = new User();user.setUserName("Sender3.....");user.setMobile("33333");rabbitTemplate.convertAndSend("topicExchange","user.message",user);} }

消費者工程

package com.example.demo.rabbitMq.exchange.topic;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class TopicRabbitConstant {public static final String TOPIC_MESSAGE = "topic.message";public static final String TOPIC_MESSAGE_S = "topic.messages";public static final String USER_MESSAGE = "user.message"; } package com.example.demo.rabbitMq.exchange.topic;import com.example.demo.dto.User; import com.example.demo.utils.Base64Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.io.IOException;@Component @RabbitListener(queues = TopicRabbitConstant.TOPIC_MESSAGE) public class TopicReceiver1 {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate AmqpTemplate rabbitTemplate;@RabbitHandlerpublic void process(User user) {System.out.println("Receiver1 : " + user);}public void rev1(){//手動去獲取消息logger.info("獲取Queue[topic.message]消息>>>");Message mesg = rabbitTemplate.receive("topic.message");System.out.println(mesg);if(null != mesg){byte[] body = mesg.getBody();try {User u = (User) Base64Utils.byteToObj(body);//獲取字符串數據 System.out.println(u);} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}} }

測試:

啟動消費者工程,生產者,執行如下方法

@Testpublic void send1() throws Exception {//會匹配到topic.#和topic.message 兩個Receiver都可以收到消息for (int i = 0, size = 10; i < size; i++) {topicSender.send1();}}

也可以不用監聽的方式,手動自主獲取隊列消息,如消費工程:

例如生產者工程TopicRabbitConfig.java添加武器隊列:

/*** 武器庫*/public static final String ARM_QUEUE = "arm.queue";@Beanpublic Queue queueArm() {return new Queue(TopicRabbitConfig.ARM_QUEUE);}@BeanBinding bindingExchangeArm(Queue queueArm, TopicExchange exchange) {return BindingBuilder.bind(queueArm).to(exchange).with("arm.#");}

生產武器:

public void send4() {//生產一批武器List<String> list = new ArrayList<String>();list.add("手槍");list.add("步槍");list.add("機槍");rabbitTemplate.convertAndSend("topicExchange","arm.gun",list);} @Testpublic void send4() throws Exception {topicSender.send4();}

消費者:

package com.example.demo.rabbitMq;import com.example.demo.dto.User; import com.example.demo.rabbitMq.exchange.topic.TopicReceiver1; import com.example.demo.utils.Base64Utils; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource; import java.io.IOException; import java.util.List;@SpringBootTest @RunWith(SpringRunner.class) public class RabbitMqRevTest {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void topicRev1(){rev1();}public void rev1(){//手動去獲取消息logger.info("獲取Queue[arm.gun]消息>>>");Message mesg = rabbitTemplate.receive("arm.queue");System.out.println(mesg);if(null != mesg){byte[] body = mesg.getBody();try {List u = (List) Base64Utils.byteToObj(body);//獲取字符串數據 System.out.println(u);} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}} }

測試:

?

?樣例代碼:

?https://github.com/xiaozhuanfeng?tab=repositories

?

轉載于:https://www.cnblogs.com/xiaozhuanfeng/p/10716236.html

總結

以上是生活随笔為你收集整理的RabbitMQ(4) TopicExchange的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。