springboot-rabbitmq-reply 消息直接回复模式
文章目錄
- 一、使用場景
- 二、Reply實戰
- (1)依賴與YML配置
- (2)RabbitMq bean配置
- (3)消息生產端
- (1)生產消息
- (2)接受Reply響應
- (4)消息消費端
- (1)方法一:sendTo注解+方法返回值
- (2)方法二:讀取生產端的消息使用模板發送
- (3)方法三:方法返回值
- (4)測試
一、使用場景
MQ的作用包括了解耦、異步等。
通常生產者只負責生產消息,而不關心消息誰去獲取,或者消費結果如何;消費者只負責接收指定的消息進行業務處理而不關心消息從哪里來一級回復業務處理情況。但我們項目中有特殊的業務存在,我們作為消息生產者在生產消息后需要接收消費者的響應結果(說白了就是類似同步調用 請求響應的MQ使用),經過研究,MQ的Reply模式(直接回復模式)就是為此種業務模式而產生。
二、Reply實戰
(1)依賴與YML配置
依賴
我這里只列出最核心的rabbitMq所需依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>配置
無其余特殊配置,因為reply就是rabbitmq的一種交互方式而已
spring:rabbitmq:host: 10.50.40.116port: 5673username: adminpassword: admin(2)RabbitMq bean配置
package com.leilei.demo;import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @author lei* @create 2022-09-19 21:44* @desc mq配置**/ @Configuration public class RabbitMqConfig {@Beanpublic Queue bizQueue() {return new Queue("bizQueue");}@Beanpublic Queue replyQueue() {return new Queue("replyQueue");}@BeanFanoutExchange bizExchange() {return new FanoutExchange("bizExchange");}@Beanpublic Binding bizBind(Queue bizQueue, FanoutExchange bizExchange) {return BindingBuilder.bind(bizQueue).to(bizExchange);} }業務類
@Data @NoArgsConstructor @AllArgsConstructor public class Vehicle implements Serializable {private Integer id;private String name; }(3)消息生產端
消息生產端需要做的事情:有生產消息、接受消息消費響應
(1)生產消息
1、生產消息,看業務場景選擇是否生成全局唯一自定義的消息ID
2、指定消息消費后響應的隊列(Reply)
/*** 生產消息** @param* @return void* @author lei* @date 2022-09-19 21:59:18*/public void replySend() {MessageProperties messageProperties = new MessageProperties();messageProperties.setReplyTo("replyQueue");//todo 根據業務,做一個嚴謹的全局唯一ID,我這里暫時用UUIDString correlationId = UUID.randomUUID().toString();// 我這里指定了唯一消息ID,看業務場景,消費者消費響應后,生產者端可根據消息ID做業務處理messageProperties.setCorrelationId(correlationId);Vehicle vehicle = new Vehicle(1, "川A0001");Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties);rabbitTemplate.convertAndSend("bizExchange","",message);System.out.println("生產者發送消息,自定義消息ID為:" + correlationId);}(2)接受Reply響應
消費者消費消息后會將處理結果進行發送到一個隊列,我們讀取這里隊列就可以拿到對應消息的響應結果進行業務處理了
/*** 接收消息響應** @param message* @return void* @author lei* @date 2022-09-19 21:59:27*/@RabbitListener(queues = "replyQueue")public void replyResponse(Message message) {String s = new String(message.getBody());String correlationId = message.getMessageProperties().getCorrelationId();System.out.println("收到客戶端響應消息ID:" + correlationId);//todo 根據消息ID可判斷這是哪一個消息的響應,我們就可做業務操作System.out.println("收到客戶端響應消息:" + s);}(4)消息消費端
消息消費端需要做的事有:接受消息然后進行業務處理、響應消息
(1)方法一:sendTo注解+方法返回值
一般來說,我們mq消費者監聽方法不需要返回值,我們這里使用sendTo注解,則需要將要響應的消息定義為返回值,sendTo注解中指定要響應到哪個隊列
重點:
1、sendTo注解指定要相應的隊列(注意和生產端保持一致)
2、方法定義的返回值內容就是要響應的消息,最終會發送到sendTo注解指定要相應的隊列
3、這種方法的缺點是消費端的主關性很高,因為sendTo指定的目標隊列可以自己瞎寫,導致生產者端無法正確收到消息響應,但我相信一般項目中也不會這么干
/*** 方式1 SendTo指定響應隊列** @param message* @return String* @author lei* @date 2022-09-19 16:17:52*/@RabbitListener(queues ="bizQueue")@SendTo("replyQueue")public String handleEmailMessage(Message message) {try {String msg=new String(message.getBody(), StandardCharsets.UTF_8);log.info("---consumer接收到消息----{}",msg);return "客戶端響應消息:"+msg+"處理完成!";} catch (Exception e) {log.error("處理業務消息失敗",e);}return null;}(2)方法二:讀取生產端的消息使用模板發送
與普通的消費者方法一樣,只需要RabbitListener注解監聽業務隊列;但還需要根據消息獲取出ReplyTo地址,然后自己消費者方法內部手動發送消息
1、優點,更強烈的感受到消息請求 響應的交互性,流程看起來更清晰
2、缺點,代碼不雅
/*** 方式2 message消息獲取內部reply rabbitmq手動發送** @param message* @return String* @author lei* @date 2022-09-19 16:17:52*/@RabbitListener(queues = "bizQueue")public void handleEmailMessage2(Message message) {try {String msg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("---consumer接收到消息----{}", msg);String replyTo = message.getMessageProperties().getReplyTo();System.out.println("接收到的reply:" + replyTo);rabbitTemplate.convertAndSend(replyTo, "客戶端響應消息:" + msg + "處理完成!", x -> {x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());return x;});} catch (Exception e) {log.error("處理業務消息失敗",e);}}(3)方法三:方法返回值
這種方式與1其實是一致的,但我經過測試,因為生產者消息指定了ReplyTo的地址,消費者端無需自己再次手動指定,即生產消息到哪里,是否響應以及響應消息發送到哪里全由生產端自己空,消費者只需要處理自身業務以及返回結果
/*** 方式三 方法有返回值,返回要響應的數據 (reply 由生產者發送消息時指定,消費者不做任何處理)** @param message* @return String* @author lei* @date 2022-09-19 23:17:47*/@RabbitListener(queues ="bizQueue")public String handleEmailMessage3(Message message) {try {String msg=new String(message.getBody(), StandardCharsets.UTF_8);log.info("---consumer接收到消息----{}",msg);return "客戶端響應消息:"+msg+"處理完成!";}catch (Exception e) {log.error("處理業務消息失敗",e);}return null;}(4)測試
生產消息:
消費消息與響應:
收到的響應:
鏈路:
如此,MQ版本的請求響應模式就完成了,其實很多大佬使用MQ來實現RPC就是用的ReplyTo啦!
項目源碼:springboot-lean
總結
以上是生活随笔為你收集整理的springboot-rabbitmq-reply 消息直接回复模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: docker安装ElasticSearc
- 下一篇: RocketMQ源码分析之request