日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Springboot RabbitMQ

發(fā)布時間:2023/12/16 javascript 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Springboot RabbitMQ 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

轉(zhuǎn)載:Springboot RabbitMQ

Springboot RabbitMQ 開發(fā),Idea 的文件目錄:

安裝過程我就不寫了,服務(wù)的安裝請參考前往:RabbitMQ Centos7 安裝以及使用

https://blog.csdn.net/yexiaomodemo/article/details/80473411

同樣,RabbitMQ里面的運行機(jī)制等如:虛擬地址、交換機(jī)、路由鍵、隊列、Direct、Topic、Fanout 等幾種模式請自行學(xué)習(xí),這里只做Springboot RabbitMQ 的實現(xiàn),開始貼代碼。

我這服務(wù)器大家可以用,不過別攻擊哈,性能不是很好。

?

Pom.xml

  • <?xml version="1.0" encoding="UTF-8"?>
  • <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  • xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  • <modelVersion>4.0.0</modelVersion>
  • <parent>
  • <groupId>org.springframework.boot</groupId>
  • <artifactId>spring-boot-starter-parent</artifactId>
  • <version>2.1.5.RELEASE</version>
  • <relativePath/> <!-- lookup parent from repository -->
  • </parent>
  • <groupId>com.xing.rabbitmq</groupId>
  • <artifactId>springboot-rabbitmq</artifactId>
  • <version>0.0.1-SNAPSHOT</version>
  • <name>springboot-rabbitmq</name>
  • <description>springbootrabbitmq project for Spring Boot</description>
  • <properties>
  • <java.version>1.8</java.version>
  • </properties>
  • <dependencies>
  • <dependency>
  • <groupId>org.springframework.boot</groupId>
  • <artifactId>spring-boot-starter-amqp</artifactId>
  • </dependency>
  • <dependency>
  • <groupId>org.springframework.boot</groupId>
  • <artifactId>spring-boot-starter-web</artifactId>
  • </dependency>
  • <dependency>
  • <groupId>org.springframework.boot</groupId>
  • <artifactId>spring-boot-starter-test</artifactId>
  • <scope>test</scope>
  • </dependency>
  • </dependencies>
  • <build>
  • <plugins>
  • <plugin>
  • <groupId>org.springframework.boot</groupId>
  • <artifactId>spring-boot-maven-plugin</artifactId>
  • </plugin>
  • </plugins>
  • </build>
  • </project>
  • application.yml

  • # server continer threads ,connections
  • server:
  • port: 9081
  • uri-encoding: UTF-8
  • max-threads: 100
  • max-connections: 5000
  • spring:
  • application:
  • name: spirng-boot-rabbitmq
  • rabbitmq:
  • host: 47.106.203.79
  • port: 5672
  • username: liuxing
  • password: liuxing
  • publisher-confirms: true #
  • publisher-returns: true
  • virtual-host: /liuxing
  • connection-timeout: 1500
  • devtools:
  • restart:
  • enabled: false
  • ?

  • package com.xing.rabbitmq.config;
  • import org.springframework.amqp.core.DirectExchange;
  • import org.springframework.amqp.core.FanoutExchange;
  • import org.springframework.amqp.core.TopicExchange;
  • import org.springframework.context.annotation.Bean;
  • import org.springframework.context.annotation.Configuration;
  • /**
  • * 消息交換機(jī)配置 可以配置多個 - 每個類型要單獨配置
  • * @Class ExchangeConfig
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:02
  • * @Direction 類說明
  • */
  • @Configuration
  • public class ExchangeConfig {
  • /** 消息交換機(jī)的名字*/
  • public static final String DIRECT_EXCHANGE = "direct_exchange"; //直連交換機(jī)
  • public static final String FANOUT_EXCHANGE = "fanout_exchange"; //廣播交換機(jī)
  • public static final String TOPIC_EXCHANGE = "topic_exchange"; //匹配交換機(jī)
  • /**
  • * TODO
  • * 1.定義direct exchange,綁定直連隊列【RabbitMqConfig.DIRECT_EXCHANGE】到路由器
  • * 2.durable="true" rabbitmq重啟的時候不需要創(chuàng)建新的交換機(jī)
  • * 3.direct交換器相對來說比較簡單,匹配規(guī)則為:如果路由鍵匹配,消息就被投送到相關(guān)的隊列
  • * fanout交換器中沒有路由鍵的概念,他會把消息發(fā)送到所有綁定在此交換器上面的隊列中。
  • * topic交換器你采用模糊匹配路由鍵的原則進(jìn)行轉(zhuǎn)發(fā)消息到隊列中
  • * key: queue在該direct-exchange中的key值,當(dāng)消息發(fā)送給direct-exchange中指定key為設(shè)置值時,
  • * 消息將會轉(zhuǎn)發(fā)給queue參數(shù)指定的消息隊列
  • */
  • @Bean
  • public DirectExchange directExchange(){
  • DirectExchange directExchange = new DirectExchange( ExchangeConfig.DIRECT_EXCHANGE,true,false);
  • return directExchange;
  • }
  • /**
  • * TODO
  • * 1.定義fanout exchange,綁定廣播隊列【RabbitMqConfig.FANOUT_EXCHANGE】到路由器
  • * 2.durable="true" rabbitmq重啟的時候不需要創(chuàng)建新的交換機(jī)
  • * 3.direct交換器相對來說比較簡單,匹配規(guī)則為:如果路由鍵匹配,消息就被投送到相關(guān)的隊列
  • * fanout交換器中沒有路由鍵的概念,他會把消息發(fā)送到所有綁定在此交換器上面的隊列中。
  • * topic交換器你采用模糊匹配路由鍵的原則進(jìn)行轉(zhuǎn)發(fā)消息到隊列中
  • * key: queue在該fanout-exchange中的key值,當(dāng)消息發(fā)送給fanout-exchange中指定key為設(shè)置值時,
  • * 消息將會轉(zhuǎn)發(fā)給注冊在【RabbitMqConfig.FANOUT_EXCHANGE】這個交換機(jī)上面的所有的queue參數(shù)指定的消息隊列
  • */
  • @Bean
  • public FanoutExchange fanoutExchange(){
  • FanoutExchange fanoutExchange = new FanoutExchange( ExchangeConfig.FANOUT_EXCHANGE,true,false);
  • return fanoutExchange;
  • }
  • /**
  • * TODO
  • * 1.定義topic exchange,綁定廣播隊列【RabbitMqConfig.TOPIC_EXCHANGE】到路由器
  • * 2.durable="true" rabbitmq重啟的時候不需要創(chuàng)建新的交換機(jī)
  • * 3.direct交換器相對來說比較簡單,匹配規(guī)則為:如果路由鍵匹配,消息就被投送到相關(guān)的隊列
  • * fanout交換器中沒有路由鍵的概念,他會把消息發(fā)送到所有綁定在此交換器上面的隊列中。
  • * topic交換器你采用模糊匹配路由鍵的原則進(jìn)行轉(zhuǎn)發(fā)消息到隊列中
  • * key: queue在該topic-exchange中的key值,當(dāng)消息發(fā)送給topic-exchange中指定key為設(shè)置值時,
  • * 消息將會轉(zhuǎn)發(fā)給注冊在【RabbitMqConfig.TOPIC_EXCHANGE】規(guī)則匹配的queue參數(shù)指定的消息隊列
  • */
  • @Bean
  • public TopicExchange topicExchange(){
  • TopicExchange topicExchange = new TopicExchange( ExchangeConfig.TOPIC_EXCHANGE,true,false);
  • return topicExchange;
  • }
  • }
  • package com.xing.rabbitmq.config;
  • import org.springframework.amqp.core.Queue;
  • import org.springframework.context.annotation.Bean;
  • import org.springframework.context.annotation.Configuration;
  • /**
  • * 隊列配置類
  • * @Class QueueConfig
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:03
  • * @Direction 類說明
  • */
  • @Configuration
  • public class QueueConfig {
  • //直連隊列
  • public static final String QUEUE_DIRECT_NAME = "direct_queue";
  • //廣播隊列1,2:因為廣播指的是廣播到所有綁定到此路由的對列
  • public static final String QUEUE_FANOUT_NAME1 = "fanout_queue1";
  • public static final String QUEUE_FANOUT_NAME2 = "fanout_queue2";
  • //匹配隊列
  • public static final String COM_TOPIC_QUEUE_LIU = "com.topic.queue.liu";
  • public static final String COM_TOPIC_QUEUE_XING = "com.topic.queue.xing";
  • /***
  • * 創(chuàng)建消息隊列,這隊列是會被注冊到【交換機(jī)--》路由鍵】里面
  • * @return
  • */
  • @Bean
  • public Queue DirectQueue() {
  • /**
  • durable="true" 持久化 rabbitmq重啟的時候不需要創(chuàng)建新的隊列
  • exclusive 表示該消息隊列是否只在當(dāng)前connection生效,默認(rèn)是false
  • auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認(rèn)是false
  • */
  • return new Queue( QUEUE_DIRECT_NAME ,true,false,false);
  • }
  • /***
  • * 廣播隊列 1
  • * @return
  • */
  • @Bean
  • public Queue fanoutQueue1() {
  • /**
  • durable="true" 持久化 rabbitmq重啟的時候不需要創(chuàng)建新的隊列
  • exclusive 表示該消息隊列是否只在當(dāng)前connection生效,默認(rèn)是false
  • auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認(rèn)是false
  • */
  • return new Queue(QUEUE_FANOUT_NAME1 ,true,false,false);
  • }
  • /***
  • * 廣播隊列 2
  • * @return
  • */
  • @Bean
  • public Queue fanoutQueue2() {
  • /**
  • durable="true" 持久化 rabbitmq重啟的時候不需要創(chuàng)建新的隊列
  • exclusive 表示該消息隊列是否只在當(dāng)前connection生效,默認(rèn)是false
  • auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認(rèn)是false
  • */
  • return new Queue(QUEUE_FANOUT_NAME2 ,true,false,false);
  • }
  • /***
  • * 匹配隊列 - liu
  • * @return
  • */
  • @Bean
  • public Queue topicQueueLiu() {
  • /**
  • durable="true" 持久化 rabbitmq重啟的時候不需要創(chuàng)建新的隊列
  • exclusive 表示該消息隊列是否只在當(dāng)前connection生效,默認(rèn)是false
  • auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認(rèn)是false
  • */
  • return new Queue( COM_TOPIC_QUEUE_LIU ,true,false,false);
  • }
  • /***
  • * 匹配隊列 - xing
  • * @return
  • */
  • @Bean
  • public Queue topicQueueXing() {
  • /**
  • durable="true" 持久化 rabbitmq重啟的時候不需要創(chuàng)建新的隊列
  • exclusive 表示該消息隊列是否只在當(dāng)前connection生效,默認(rèn)是false
  • auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認(rèn)是false
  • */
  • return new Queue( COM_TOPIC_QUEUE_XING ,true,false,false);
  • }
  • }
  • package com.xing.rabbitmq.config;
  • import com.xing.rabbitmq.mqcallback.MsgSendConfirmCallBack;
  • import com.xing.rabbitmq.mqcallback.MsgSendReturnCallback;
  • import org.springframework.amqp.core.AcknowledgeMode;
  • import org.springframework.amqp.core.Binding;
  • import org.springframework.amqp.core.BindingBuilder;
  • import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  • import org.springframework.amqp.rabbit.core.RabbitTemplate;
  • import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  • import org.springframework.beans.factory.annotation.Autowired;
  • import org.springframework.context.annotation.Bean;
  • import org.springframework.context.annotation.Configuration;
  • /**
  • * RabbitMq配置
  • * @Class RabbitMqConfig
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:05
  • * @Direction 類說明
  • */
  • @Configuration
  • public class RabbitMqConfig {
  • /**
  • * 連接工廠
  • */
  • @Autowired
  • private ConnectionFactory connectionFactory;
  • /*
  • * key: queue在該direct-exchange中的key值,當(dāng)消息發(fā)送給direct-exchange中指定key為設(shè)置值時,
  • * 消息將會轉(zhuǎn)發(fā)給queue參數(shù)指定的消息隊列
  • */
  • /** 直連 綁定的 路由鍵 */
  • public static final String ROUTIN_DIRECT_KEY = "queue_direct_key";
  • /** 廣播 設(shè)定的 路由鍵 */
  • public static final String ROUTIN_FANOUT_KEY = "queue_fanout_key";
  • /** 匹配 設(shè)定的 路由鍵 */
  • public static final String ROUTIN_TOPIC_KEY = "com.topic.queue.*";
  • @Autowired
  • private QueueConfig queueConfig; //隊列配置信息
  • @Autowired
  • private ExchangeConfig exchangeConfig; //交換機(jī)配置信息
  • /**
  • * 匹配交換機(jī)【DIRECT_EXCHANGE】進(jìn)行綁定,隊列【QUEUE_DIRECT_NAME】,路由鍵【ROUTIN_DIRECT_KEY】
  • */
  • @Bean
  • public Binding binding_direct() {
  • return BindingBuilder.bind(queueConfig.DirectQueue()).to(exchangeConfig.directExchange()).with( RabbitMqConfig.ROUTIN_DIRECT_KEY );
  • }
  • /**
  • * 廣播交換機(jī)【TOPIC_EXCHANGE】進(jìn)行綁定,隊列【QUEUE_FANOUT_NAME1】,廣播機(jī)制不需要路由鍵
  • */
  • @Bean
  • public Binding binding_fanout1() {
  • return BindingBuilder.bind(queueConfig.fanoutQueue1()).to(exchangeConfig.fanoutExchange()) ; //.with( RabbitMqConfig.ROUTIN_FANOUT_KEY );
  • }
  • /**
  • * 廣播交換機(jī)【TOPIC_EXCHANGE】進(jìn)行綁定,隊列【QUEUE_FANOUT_NAME2】,廣播機(jī)制不需要路由鍵
  • */
  • @Bean
  • public Binding binding_fanout2() {
  • return BindingBuilder.bind(queueConfig.fanoutQueue2()).to(exchangeConfig.fanoutExchange()) ; //.with( RabbitMqConfig.ROUTIN_FANOUT_KEY );
  • }
  • /**
  • * 匹配交換機(jī)【FANOUT_EXCHANGE】進(jìn)行綁定,隊列【COM_TOPIC_QUEUE_LIU】,路由鍵【ROUTIN_TOPIC_KEY】
  • */
  • @Bean
  • public Binding binding_topic_liu() {
  • return BindingBuilder.bind(queueConfig.topicQueueLiu()).to(exchangeConfig.topicExchange()).with( RabbitMqConfig.ROUTIN_TOPIC_KEY );
  • }
  • /**
  • * 匹配交換機(jī)【FANOUT_EXCHANGE】進(jìn)行綁定,隊列【COM_TOPIC_QUEUE_LIU】,路由鍵【ROUTIN_TOPIC_KEY】
  • */
  • @Bean
  • public Binding binding_topic_xing() {
  • return BindingBuilder.bind(queueConfig.topicQueueXing()).to(exchangeConfig.topicExchange()).with( RabbitMqConfig.ROUTIN_TOPIC_KEY );
  • }
  • /**
  • * queue listener 觀察 監(jiān)聽模式
  • * 當(dāng)有消息到達(dá)時會通知監(jiān)聽在對應(yīng)的隊列上的監(jiān)聽對象
  • * @return
  • */
  • /*@Bean
  • public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
  • SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
  • simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());
  • simpleMessageListenerContainer.setExposeListenerChannel(true);
  • simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
  • simpleMessageListenerContainer.setConcurrentConsumers(1);
  • simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn)
  • return simpleMessageListenerContainer;
  • }*/
  • /**
  • * 自定義rabbit template用于數(shù)據(jù)的接收和發(fā)送
  • * 可以設(shè)置消息確認(rèn)機(jī)制和回調(diào)
  • * @return
  • */
  • @Bean
  • public RabbitTemplate rabbitTemplate() {
  • RabbitTemplate template = new RabbitTemplate(connectionFactory);
  • // template.setMessageConverter(); 可以自定義消息轉(zhuǎn)換器 默認(rèn)使用的JDK的,所以消息對象需要實現(xiàn)Serializable
  • /**若使用confirm-callback或return-callback,
  • * 必須要配置publisherConfirms或publisherReturns為true
  • * 每個rabbitTemplate只能有一個confirm-callback和return-callback
  • */
  • template.setConfirmCallback(msgSendConfirmCallBack());
  • /**
  • * 使用return-callback時必須設(shè)置mandatory為true,或者在配置中設(shè)置mandatory-expression的值為true,
  • * 可針對每次請求的消息去確定’mandatory’的boolean值,
  • * 只能在提供’return -callback’時使用,與mandatory互斥
  • */
  • template.setReturnCallback(msgSendReturnCallback());
  • template.setMandatory(true);
  • return template;
  • }
  • /* 關(guān)于 msgSendConfirmCallBack 和 msgSendReturnCallback 的回調(diào)說明:
  • 1.如果消息沒有到exchange,則confirm回調(diào),ack=false
  • 2.如果消息到達(dá)exchange,則confirm回調(diào),ack=true
  • 3.exchange到queue成功,則不回調(diào)return
  • 4.exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
  • */
  • /**
  • * 消息確認(rèn)機(jī)制
  • * Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些消息被broker處理,
  • * 哪些可能因為broker宕掉或者網(wǎng)絡(luò)失敗的情況而重新發(fā)布。
  • * 確認(rèn)并且保證消息被送達(dá),提供了兩種方式:發(fā)布確認(rèn)和事務(wù)。(兩者不可同時使用)
  • * 在channel為事務(wù)時,不可引入確認(rèn)模式;同樣channel為確認(rèn)模式下,不可使用事務(wù)。
  • * @return
  • */
  • @Bean
  • public MsgSendConfirmCallBack msgSendConfirmCallBack(){
  • return new MsgSendConfirmCallBack();
  • }
  • @Bean
  • public MsgSendReturnCallback msgSendReturnCallback(){
  • return new MsgSendReturnCallback();
  • }
  • }
  • package com.xing.rabbitmq.controller;
  • import com.xing.rabbitmq.sender.direct.FirstDirectSender;
  • import com.xing.rabbitmq.sender.fanout.FirstFanoutSender;
  • import com.xing.rabbitmq.sender.topic.TopicSender;
  • import org.springframework.beans.factory.annotation.Autowired;
  • import org.springframework.web.bind.annotation.GetMapping;
  • import org.springframework.web.bind.annotation.RestController;
  • import java.util.UUID;
  • /**
  • * @Class SendController
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:12
  • * @Direction 類說明
  • */
  • @RestController
  • public class SendController {
  • //直連方式 - direct 隊列
  • @Autowired
  • private FirstDirectSender firstDirectSender;
  • //廣播方式 - fanout 隊列
  • @Autowired
  • private FirstFanoutSender firstFanoutSender;
  • //匹配方式 - topic 隊列
  • @Autowired
  • private TopicSender topicSender ;
  • /***
  • * TODO 測試direct 直連的隊列處理
  • *
  • * http://192.168.2.232:9081/directSend?message=call%20phone
  • *
  • * @param message
  • * @return
  • */
  • @GetMapping("/directSend")
  • public String directSend(String message){
  • String uuid = UUID.randomUUID().toString();
  • firstDirectSender.send(uuid,message);
  • return uuid;
  • }
  • /***
  • * TODO 測試fanout 廣播的隊列處理
  • *
  • * http://192.168.2.232:9081/fanoutSend?message=fanout%20go%20home
  • *
  • * @param message
  • * @return
  • */
  • @GetMapping("/fanoutSend")
  • public String fanoutSend(String message){
  • String uuid = UUID.randomUUID().toString();
  • firstFanoutSender.send( uuid,message );
  • return uuid;
  • }
  • /***
  • * TODO 測試topic 匹配的隊列處理
  • *
  • * http://192.168.2.232:9081/topicSend?message=%E5%8C%B9%E9%85%8D%E6%A8%A1%E5%BC%8F%E9%80%9A%E7%9F%A5
  • *
  • * @param message
  • * @return
  • */
  • @GetMapping("/topicSend")
  • public String topicSend(String message){
  • String uuid = UUID.randomUUID().toString();
  • topicSender.send( uuid,message );
  • return uuid;
  • }
  • }
  • package com.xing.rabbitmq.mqcallback;
  • import org.springframework.amqp.rabbit.connection.CorrelationData;
  • import org.springframework.amqp.rabbit.core.RabbitTemplate;
  • /**
  • * 消息發(fā)送到交換機(jī)確認(rèn)機(jī)制
  • * @Class MsgSendConfirmCallBack
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:07
  • * @Direction 類說明
  • */
  • public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
  • public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  • System.out.println("MsgSendConfirmCallBack , 回調(diào)id:" + correlationData);
  • if (ack) {
  • System.out.println("消息消費成功");
  • } else {
  • System.out.println("消息消費失敗:" + cause+"\n重新發(fā)送");
  • }
  • }
  • }
  • package com.xing.rabbitmq.mqcallback;
  • import org.springframework.amqp.core.Message;
  • import org.springframework.amqp.rabbit.core.RabbitTemplate;
  • /**
  • * 這里有BUG,成功后沒過來
  • * @Class MsgSendReturnCallback
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-22 11:53
  • * @Direction 類說明
  • */
  • public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
  • @Override
  • public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  • System.out.println("回饋消息:"+message);
  • }
  • }
  • package com.xing.rabbitmq.receiver.direct;
  • import com.xing.rabbitmq.config.QueueConfig;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;
  • import org.springframework.amqp.rabbit.annotation.RabbitListener;
  • import org.springframework.stereotype.Component;
  • /**
  • * 消息消費者 直連路由鍵
  • * @Class DirectConsumer
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:10
  • * @Direction 類說明
  • */
  • @Component
  • public class DirectConsumer {
  • public static final Logger logger = LoggerFactory.getLogger( DirectConsumer.class ) ;
  • /****
  • * 監(jiān)聽直連的隊列,一旦此隊列出現(xiàn)數(shù)據(jù),則自動消費此隊列的數(shù)據(jù)
  • * @param message
  • * @throws Exception
  • */
  • //@RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME , QueueConfig.QUEUE_DIRECT_NAME1 }, containerFactory = "rabbitListenerContainerFactory")
  • @RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME }, containerFactory = "rabbitListenerContainerFactory")
  • public void handleMessage(String message) throws Exception {
  • // 處理消息
  • logger.info("直連模式隊列:{}, 消費者: 接收到消息如下:{} " , QueueConfig.QUEUE_DIRECT_NAME , message);
  • }
  • }
  • package com.xing.rabbitmq.receiver.fanout;
  • import com.xing.rabbitmq.config.QueueConfig;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;
  • import org.springframework.amqp.rabbit.annotation.RabbitListener;
  • import org.springframework.stereotype.Component;
  • /**
  • * 消息消費者 廣播機(jī)制接收
  • * @Class FanoutConsumer1
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-21 17:27
  • * @Direction 類說明
  • */
  • @Component
  • public class FanoutConsumer1 {
  • public static final Logger logger = LoggerFactory.getLogger( FanoutConsumer1.class ) ;
  • /****
  • * 監(jiān)聽隊列,一旦此隊列出現(xiàn)數(shù)據(jù),則自動消費此隊列的數(shù)據(jù) ,廣播機(jī)制的隊列名稱【QueueConfig.QUEUE_FANOUT_NAME】
  • * @param message
  • * @throws Exception
  • */
  • @RabbitListener(queues = { QueueConfig.QUEUE_FANOUT_NAME1 }, containerFactory = "rabbitListenerContainerFactory")
  • public void handleMessage(String message) throws Exception {
  • // 處理消息
  • logger.info("廣播機(jī)制隊列:{}, 第一個消費者: 接收到消息如下:{} " , QueueConfig.QUEUE_FANOUT_NAME1 , message);
  • }
  • }
  • package com.xing.rabbitmq.receiver.fanout;
  • import com.xing.rabbitmq.config.QueueConfig;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;
  • import org.springframework.amqp.rabbit.annotation.RabbitListener;
  • import org.springframework.stereotype.Component;
  • /**
  • * 消息消費者 廣播機(jī)制接收 二
  • * @Class FanoutConsumer2
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-21 17:27
  • * @Direction 類說明
  • */
  • @Component
  • public class FanoutConsumer2 {
  • public static final Logger logger = LoggerFactory.getLogger( FanoutConsumer2.class ) ;
  • /****
  • * 監(jiān)聽隊列,一旦此隊列出現(xiàn)數(shù)據(jù),則自動消費此隊列的數(shù)據(jù) ,廣播機(jī)制的隊列名稱【QueueConfig.QUEUE_FANOUT_NAME】
  • * @param message
  • * @throws Exception
  • */
  • @RabbitListener(queues = { QueueConfig.QUEUE_FANOUT_NAME2 }, containerFactory = "rabbitListenerContainerFactory")
  • public void handleMessage(String message) throws Exception {
  • // 處理消息
  • logger.info("廣播機(jī)制隊列:{}, 第二個消費者: 接收到消息如下:{} " , QueueConfig.QUEUE_FANOUT_NAME2 , message);
  • }
  • }
  • package com.xing.rabbitmq.receiver.topic;
  • import com.xing.rabbitmq.config.QueueConfig;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;
  • import org.springframework.amqp.rabbit.annotation.RabbitListener;
  • import org.springframework.stereotype.Component;
  • /**
  • * 消息消費者 匹配路由鍵
  • * @Class TopicLiuConsumer
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:10
  • * @Direction 類說明
  • */
  • @Component
  • public class TopicLiuConsumer {
  • public static final Logger logger = LoggerFactory.getLogger( TopicLiuConsumer.class ) ;
  • /****
  • * 監(jiān)聽兩個隊列,一旦此隊列出現(xiàn)數(shù)據(jù),則自動消費此隊列的數(shù)據(jù)
  • * @param message
  • * @throws Exception
  • */
  • //@RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME , QueueConfig.QUEUE_DIRECT_NAME1 }, containerFactory = "rabbitListenerContainerFactory")
  • @RabbitListener(queues = { QueueConfig.COM_TOPIC_QUEUE_LIU }, containerFactory = "rabbitListenerContainerFactory")
  • public void handleMessage(String message) throws Exception {
  • // 處理消息
  • logger.info("TOPIC隊列:{} 匹配模式消費者: 接收到消息如下:{} " , QueueConfig.COM_TOPIC_QUEUE_LIU , message);
  • }
  • }
  • package com.xing.rabbitmq.receiver.topic;
  • import com.xing.rabbitmq.config.QueueConfig;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;
  • import org.springframework.amqp.rabbit.annotation.RabbitListener;
  • import org.springframework.stereotype.Component;
  • /**
  • * 消息消費者 匹配路由鍵:
  • * @Class TopicXingConsumer
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-22 10:12
  • * @Direction 類說明
  • */
  • @Component
  • public class TopicXingConsumer {
  • public static final Logger logger = LoggerFactory.getLogger( TopicXingConsumer.class ) ;
  • /****
  • * 監(jiān)聽兩個隊列,一旦此隊列出現(xiàn)數(shù)據(jù),則自動消費此隊列的數(shù)據(jù)
  • * @param message
  • * @throws Exception
  • */
  • //@RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME , QueueConfig.QUEUE_DIRECT_NAME1 }, containerFactory = "rabbitListenerContainerFactory")
  • @RabbitListener(queues = { QueueConfig.COM_TOPIC_QUEUE_XING }, containerFactory = "rabbitListenerContainerFactory")
  • public void handleMessage(String message) throws Exception {
  • // 處理消息
  • logger.info("TOPIC隊列:{} 匹配模式消費者: 接收到消息如下:{} " , QueueConfig.COM_TOPIC_QUEUE_XING , message);
  • }
  • }
  • package com.xing.rabbitmq.sender.direct;
  • import com.xing.rabbitmq.config.ExchangeConfig;
  • import com.xing.rabbitmq.config.QueueConfig;
  • import com.xing.rabbitmq.config.RabbitMqConfig;
  • import org.springframework.amqp.rabbit.connection.CorrelationData;
  • import org.springframework.amqp.rabbit.core.RabbitTemplate;
  • import org.springframework.beans.factory.annotation.Autowired;
  • import org.springframework.stereotype.Component;
  • import java.util.UUID;
  • /**
  • * 消息發(fā)布者 直連路由鍵
  • * @Class FirstDirectSender
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:09
  • * @Direction 類說明
  • */
  • @Component
  • public class FirstDirectSender {
  • @Autowired
  • private RabbitTemplate rabbitTemplate;
  • /**
  • * 發(fā)送消息
  • * @param uuid
  • * @param message 消息
  • */
  • public void send(String uuid,Object message) {
  • CorrelationData correlationId = new CorrelationData(uuid);
  • /**
  • * ExchangeConfig.DIRECT_EXCHANGE 指定消息交換機(jī)
  • * RabbitMqConfig.ROUTIN_DIRECT_KEY 指定路由鍵
  • */
  • //發(fā)送消息至:direct類型的交互機(jī)【RabbitMqConfig.DIRECT_EXCHANGE】上,路由鍵是【RabbitMqConfig.ROUTIN_DIRECT_KEY】
  • rabbitTemplate.convertAndSend( ExchangeConfig.DIRECT_EXCHANGE , RabbitMqConfig.ROUTIN_DIRECT_KEY ,
  • message, correlationId);
  • }
  • }
  • package com.xing.rabbitmq.sender.fanout;
  • import com.xing.rabbitmq.config.ExchangeConfig;
  • import com.xing.rabbitmq.config.RabbitMqConfig;
  • import org.springframework.amqp.rabbit.connection.CorrelationData;
  • import org.springframework.amqp.rabbit.core.RabbitTemplate;
  • import org.springframework.beans.factory.annotation.Autowired;
  • import org.springframework.stereotype.Component;
  • /**
  • * 消息發(fā)布者 廣播機(jī)制 重要的提示:廣播指的是廣播到所有的隊列,不是廣播到所有的消費者
  • * @Class FirstDirectSender
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-20 17:09
  • * @Direction 類說明
  • */
  • @Component
  • public class FirstFanoutSender {
  • @Autowired
  • private RabbitTemplate rabbitTemplate;
  • /**
  • * 發(fā)送消息
  • * @param uuid
  • * @param message 消息
  • */
  • public void send(String uuid,Object message) {
  • CorrelationData correlationId = new CorrelationData(uuid);
  • /**
  • * ExchangeConfig.FANOUT_EXCHANGE 指定消息交換機(jī)
  • * RabbitMqConfig.ROUTIN_FANOUT_KEY 指定一個路由鍵,不過其實fanout不需要路由鍵
  • */
  • //發(fā)送消息至:direct類型的交互機(jī)【ExchangeConfig.FANOUT_EXCHANGE】上,路由鍵是【RabbitMqConfig.ROUTIN_FANOUT_KEY】
  • rabbitTemplate.convertAndSend( ExchangeConfig.FANOUT_EXCHANGE , RabbitMqConfig.ROUTIN_FANOUT_KEY ,
  • message, correlationId );
  • //TODO 綁定在此交換機(jī)上的隊列都會收到數(shù)據(jù),監(jiān)聽此隊列的消費者都會收到信息
  • }
  • }
  • package com.xing.rabbitmq.sender.topic;
  • import com.xing.rabbitmq.config.ExchangeConfig;
  • import com.xing.rabbitmq.config.RabbitMqConfig;
  • import org.springframework.amqp.rabbit.connection.CorrelationData;
  • import org.springframework.amqp.rabbit.core.RabbitTemplate;
  • import org.springframework.beans.factory.annotation.Autowired;
  • import org.springframework.stereotype.Component;
  • /**
  • * 消息發(fā)布者 匹配機(jī)制 重要的提示:匹配指的是根據(jù)規(guī)則匹配到所有的隊列,不是匹配到所有的消費者
  • * @Class TopicSender
  • * @Author 作者姓名:LiuXing
  • * @Version 1.0
  • * @Date 創(chuàng)建時間:2019-05-21 11:23
  • * @Direction 類說明
  • */
  • @Component
  • public class TopicSender {
  • @Autowired
  • private RabbitTemplate rabbitTemplate;
  • /**
  • * 發(fā)送消息
  • * @param uuid
  • * @param message 消息
  • */
  • public void send(String uuid,Object message) {
  • CorrelationData correlationId = new CorrelationData(uuid);
  • /**
  • * ExchangeConfig.TOPIC_EXCHANGE 指定消息交換機(jī)
  • * RabbitMqConfig.ROUTIN_TOPIC_KEY 指定一個路由鍵,不過其實fanout不需要路由鍵
  • */
  • //發(fā)送消息至:direct類型的交互機(jī)【ExchangeConfig.TOPIC_EXCHANGE】上,路由鍵是【RabbitMqConfig.ROUTIN_TOPIC_KEY】
  • rabbitTemplate.convertAndSend( ExchangeConfig.TOPIC_EXCHANGE , RabbitMqConfig.ROUTIN_TOPIC_KEY ,
  • message, correlationId );
  • //TODO 綁定在此交換機(jī)上的隊列都會收到數(shù)據(jù),監(jiān)聽此隊列的消費者都會收到信息
  • }
  • }
  • package com.xing.rabbitmq;
  • import org.springframework.boot.SpringApplication;
  • import org.springframework.boot.autoconfigure.SpringBootApplication;
  • @SpringBootApplication
  • public class SpringbootRabbitmqApplication {
  • public static void main(String[] args) {
  • SpringApplication.run(SpringbootRabbitmqApplication.class, args);
  • }
  • }
  • 基礎(chǔ)運行時序圖:

    使用方式:

    啟動:SpringbootRabbitmqApplication訪問:SendController.class里面的三個鏈接,有對應(yīng)的日志打印出來,具體的拓?fù)鋱D請查看文件:Springboot RabbitMQ運行時序圖.vsd

    部分代碼參考的網(wǎng)友,補全了另外的幾種方式的應(yīng)用,也補齊了注釋

    總結(jié)

    以上是生活随笔為你收集整理的Springboot RabbitMQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。