日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spring boot rabbitmq 延时消费的简单实现

發布時間:2024/3/26 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spring boot rabbitmq 延时消费的简单实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

實現一:TTL

設置隊列過期時間實現延時消費

設置消息過期時間實現延時消費

實現二:插件實現


公司最近需要用到rabbitmq,考慮到業務需求,后期可能需要用到mq延時消費機制。工作一年,對很多技術都不了解,還是一名技術小白,決定主動學習研究一下。

在網上查閱瀏覽了許多帖子,關于延時消費主要分為兩種實現,一種是rabbitmq的TTL機制,一種是rabbitmq的插件實現。

感謝以下樓主的經驗分享:

https://www.cnblogs.com/boshen-hzb/p/6841982.html? ?springboot和rabbitmq整合

https://blog.csdn.net/u014308482/article/details/53036770? 延時消費的兩種實現

https://blog.csdn.net/u010046908/article/details/54773323? ? mac安裝rabbitmq

以下為自己根據網上資料學習后得出的個人總結,如有錯誤之處,歡迎指正。


實現一:TTL

?

TTL指過期時間,rabbitmq可以通過設置隊列的過期時間或者消息的過期時間實現延時消費。

?

準備工作:

安裝rabbitmq

添加相關maven依賴?

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

設置隊列過期時間實現延時消費
?

交換機及隊列配置,公司用的spring-boot框架,為簡化步驟,我直接配在啟動類。

代碼中有四個配置,第一個配置的exchange是用來接收已過期的隊列信息并進行重新分配隊列進行消費,第二個配置的repeatTradeQueue為exchange重新分配的隊列名,第三個是將repeatTradeQueue隊列與exchange交換機綁定,并指定對應的routing key,第四個配置的就是我們要設置過期時間的隊列deadLetterQueue,配置中有三個參數,x-message-ttl為過期時間,該隊列所有消息的過期時間都為我們配置的這個值,單位為毫秒,這里我設置過期時間為3秒,x-dead-letter-exchange是指過期消息重新轉發到指定交換機,也就是exchange,x-dead-letter-routing-key是該交換機上綁定的routing-key,將通過配置的routing-key分配對應的隊列,也就是前面配置的repeatTradeQueue。

import java.util.HashMap; import java.util.Map;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean;@SpringBootApplication public class Application {//交換機用于重新分配隊列@BeanDirectExchange exchange() {return new DirectExchange("exchange");}//用于延時消費的隊列@Beanpublic Queue repeatTradeQueue() {Queue queue = new Queue("repeatTradeQueue",true,false,false);return queue; }//綁定交換機并指定routing key@Beanpublic Binding repeatTradeBinding() {return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue");}//配置死信隊列@Beanpublic Queue deadLetterQueue() {Map<String,Object> args = new HashMap<>();args.put("x-message-ttl", 3000);args.put("x-dead-letter-exchange", "exchange");args.put("x-dead-letter-routing-key", "repeatTradeQueue");return new Queue("deadLetterQueue", true, false, false, args);}public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);} }

配置生產者,這里生產者需要指定前面配置了過期時間的隊列deadLetterQueue

import java.time.LocalDateTime;import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class DeadLetterSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {System.out.println("DeadLetterSender 發送時間:"+LocalDateTime.now().toString()+" msg內容:"+msg);rabbitTemplate.convertAndSend("deadLetterQueue", msg);}}

配置消費者,消費者監聽指定用于延時消費的隊列repeatTradeQueue

import java.time.LocalDateTime;@Component @RabbitListener(queues = "repeatTradeQueue") public class RepeatTradeReceiver {@RabbitHandlerpublic void process(String msg) {System.out.println("repeatTradeQueue 接收時間:"+LocalDateTime.now().toString()+" 接收內容:"+msg);}}

寫一個簡單的接口調用測試延時消費是否成功

import org.springframework.beans.factory.annotation.Autowired;@RestController @RequestMapping("/rabbit") public class RabbitTest {@Autowiredprivate DeadLetterSender deadLetterSender;@GetMapping("/deadTest")public void deadTest() {deadLetterSender.send("隊列設置過期時間測試");}}

啟動項目開始測試

發送端和接收端時間間隔3秒(毫秒差就不要較真了⊙﹏⊙|||)

?

設置消息過期時間實現延時消費

?

還是先貼上配置的代碼,基本配置都一樣,唯一的區別是deadLetterQueue的過期時間這里不做配置,需要注意的是,因為我這里用的是同一個隊列名,所以即使將隊列過期時間配置刪除,mq中該隊列過期時間仍然還是存在的,所以需要刪除該隊列,啟動項目時才能重新配置該隊列屬性,可能可以通過配置的方式重新覆蓋屬性配置,小白沒研究出來(?_?),當然也可以保留隊列過期時間的配置,當兩個過期時間都存在時,消息取更小的過期時間。

import java.util.HashMap;@SpringBootApplication public class Application {//用于死信隊列轉發的交換機@BeanDirectExchange exchange() {return new DirectExchange("exchange");}//用于延時消費的隊列@Beanpublic Queue repeatTradeQueue() {Queue queue = new Queue("repeatTradeQueue",true,false,false);return queue; }//綁定交換機并指定routing key@Beanpublic Binding repeatTradeBinding() {return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue");}//配置死信隊列@Beanpublic Queue deadLetterQueue() {Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "exchange");args.put("x-dead-letter-routing-key", "repeatTradeQueue");return new Queue("deadLetterQueue", true, false, false, args);}public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);} }

?

配置生產者,message的expiration就是過期時間的設置,單位也是毫秒

import java.time.LocalDateTime;import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class DeadLetterSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg, long times) {System.out.println("DeadLetterSender 發送時間:" + LocalDateTime.now().toString() + " msg內容:" + msg);MessagePostProcessor processor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(times + "");return message;}};rabbitTemplate.convertAndSend("deadLetterQueue", (Object)msg, processor);} }

消費者不變,用之前的類即可

稍微修改一下接口,設置時間為5秒

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import me.miaobo.mq.sender.DeadLetterSender;@RestController @RequestMapping("/rabbit") public class RabbitTest {@Autowiredprivate DeadLetterSender deadLetterSender;@GetMapping("/deadTest")public void deadTest() {deadLetterSender.send("消息設置過期時間測試",5000);} }

補充一下隊列的刪除,在控制臺選擇queues菜單,找到我們配置的隊列,點擊名稱進詳情,操作介紹有點傻,不清楚mq的可以看看之前的鏈接貼。
Mac 安裝 rabbitmq

?

?

進入詳情界面可以看到之前的配置,過期時間3秒,自己通過項目重啟發現過期時間并不會刪除,只好在管理界面手動刪除。

?

?

下拉詳情頁面,找到刪除按鈕,刪除該隊列

?

?

啟動服務,測試接口

時間相隔5秒,和接口設置的時間保持一致。

?

為了驗證隊列和消息過期時間同時配置時取最小值,我又刪除了隊列一遍,把隊列3秒過期時間加上。

繼續測試 (●゚ω゚●),還是用之前的接口測試。

不再是接口設置的5秒過期時間,而是隊列設置的3秒過期時間生效。


實現二:插件實現

?

rabbitmq 安裝rabbitmq_delayed_message_exchange插件(版本要求網上說3.5.7以上,未驗證低版本問題,安裝的3.7.7版本)

插件下載地址

插件包放在rabbit安裝目錄/plugins/菜單下

運行命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安裝Erlang(版本要求網上說18.0以上,未驗證低版本問題,homebrew自動安裝的20.3.8.2版本)

mac通過brew命令安裝

brew install erlang

windows的沒研究,可以自己百度下(~ ̄▽ ̄)~

?

插件實現延時消費時踩了個坑,就是插件包版本有問題,自己在網上下載別人分享的包,結果測試一直不通過,折騰了挺久沒找到原因,懷疑是插件包的問題,就去官網下載了對應3.7.7版本的插件包。

?

安裝完成后去管理頁面驗證插件是否有效

?

插件若安裝成功,type類型會有多一個x-delayed-message,rabbitmq默認是沒有的,add一個exchange,選擇該類型,添加成功說明插件啟用成功。我是沒測試add,結果直接去寫demo測試了,結果一直啟動報錯還不知道原因,后來發現是插件包的問題/(ㄒoㄒ)/~~。

?

接著就是延時隊列的配置

注意delayedExchange是用的自定義類型,類型為x-delayed-message,x-delayed-type要求是rabbitmq的類型,取direct,topic,fanout,headers類型中的一個,x-delayed-type說實話不知道具體干嘛用的,就這么照著配置了 ?( ̄3 ̄)。

import java.util.HashMap;@SpringBootApplication public class Application {/*** 自定義的交換機類型* @return*/@BeanCustomExchange delayedExchange() {Map<String,Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayedExchange","x-delayed-message",true,false,args);}/*** 創建一個隊列* @return*/@Beanpublic Queue delayedQueue() {return new Queue("delayedQueue",true);}/** * 綁定隊列到自定義交換機 * @return */ @Bean public Binding bindingNotify() { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayedQueue").noargs(); } public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);} }

配置生產者,這里對message的header信息進行配置,配置的x-delay參數就是延時時間,單位也是毫秒,指定綁定的交換機名稱以及routing key

import java.time.LocalDateTime;import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg, long time) {System.out.println("DelayedSender 發送時間: " + LocalDateTime.now() + " msg內容:" + msg);MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", time);return message;}};this.rabbitTemplate.convertAndSend("delayedExchange", "delayedQueue", msg, messagePostProcessor);}}

配置消費者,監聽對應的隊列名稱delayedQueue

import java.time.LocalDateTime;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "delayedQueue") public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {System.out.println("DelayedReceiver 接受時間: " + LocalDateTime.now() + " msg內容:" + msg);} }

?

寫接口測試

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import me.miaobo.mq.sender.DelayedSender;@RestController @RequestMapping("/rabbit") public class RabbitTest {@Autowiredprivate DelayedSender delayedSender;@GetMapping("/delayedTest")public void delayedTest() {delayedSender.send("插件實現延時隊列",6000);}}

?

啟動項目測試,延遲時間為接口設置的6秒。

?

以上為自己對mq延時消費機制的簡單實現,分享給初學mq的程序猿們,學習記錄,僅供參考,歡迎評論區補充指點!

總結

以上是生活随笔為你收集整理的spring boot rabbitmq 延时消费的简单实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。