spring boot rabbitmq 延时消费的简单实现
目錄
實現一:TTL
設置隊列過期時間實現延時消費
設置消息過期時間實現延時消費
實現二:插件實現
公司最近需要用到rabbitmq,考慮到業務需求,后期可能需要用到mq延時消費機制。工作一年,對很多技術都不了解,還是一名技術小白,決定主動學習研究一下。
在網上查閱瀏覽了許多帖子,關于延時消費主要分為兩種實現,一種是rabbitmq的TTL機制,一種是rabbitmq的插件實現。
感謝以下樓主的經驗分享:
https://www.cnblogs.com/boshen-hzb/p/6841982.html? ?springboot和rabbitmq整合
https://blog.csdn.net/u014308482/article/details/53036770? 延時消費的兩種實現
https://blog.csdn.net/u010046908/article/details/54773323? ? mac安裝rabbitmq
以下為自己根據網上資料學習后得出的個人總結,如有錯誤之處,歡迎指正。
實現一:TTL
?
TTL指過期時間,rabbitmq可以通過設置隊列的過期時間或者消息的過期時間實現延時消費。
?
準備工作:
安裝rabbitmq
添加相關maven依賴?
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>設置隊列過期時間實現延時消費
?
交換機及隊列配置,公司用的spring-boot框架,為簡化步驟,我直接配在啟動類。
代碼中有四個配置,第一個配置的exchange是用來接收已過期的隊列信息并進行重新分配隊列進行消費,第二個配置的repeatTradeQueue為exchange重新分配的隊列名,第三個是將repeatTradeQueue隊列與exchange交換機綁定,并指定對應的routing key,第四個配置的就是我們要設置過期時間的隊列deadLetterQueue,配置中有三個參數,x-message-ttl為過期時間,該隊列所有消息的過期時間都為我們配置的這個值,單位為毫秒,這里我設置過期時間為3秒,x-dead-letter-exchange是指過期消息重新轉發到指定交換機,也就是exchange,x-dead-letter-routing-key是該交換機上綁定的routing-key,將通過配置的routing-key分配對應的隊列,也就是前面配置的repeatTradeQueue。
import java.util.HashMap; import java.util.Map;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean;@SpringBootApplication public class Application {//交換機用于重新分配隊列@BeanDirectExchange exchange() {return new DirectExchange("exchange");}//用于延時消費的隊列@Beanpublic Queue repeatTradeQueue() {Queue queue = new Queue("repeatTradeQueue",true,false,false);return queue; }//綁定交換機并指定routing key@Beanpublic Binding repeatTradeBinding() {return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue");}//配置死信隊列@Beanpublic Queue deadLetterQueue() {Map<String,Object> args = new HashMap<>();args.put("x-message-ttl", 3000);args.put("x-dead-letter-exchange", "exchange");args.put("x-dead-letter-routing-key", "repeatTradeQueue");return new Queue("deadLetterQueue", true, false, false, args);}public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);} }配置生產者,這里生產者需要指定前面配置了過期時間的隊列deadLetterQueue
import java.time.LocalDateTime;import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class DeadLetterSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {System.out.println("DeadLetterSender 發送時間:"+LocalDateTime.now().toString()+" msg內容:"+msg);rabbitTemplate.convertAndSend("deadLetterQueue", msg);}}配置消費者,消費者監聽指定用于延時消費的隊列repeatTradeQueue
import java.time.LocalDateTime;@Component @RabbitListener(queues = "repeatTradeQueue") public class RepeatTradeReceiver {@RabbitHandlerpublic void process(String msg) {System.out.println("repeatTradeQueue 接收時間:"+LocalDateTime.now().toString()+" 接收內容:"+msg);}}寫一個簡單的接口調用測試延時消費是否成功
import org.springframework.beans.factory.annotation.Autowired;@RestController @RequestMapping("/rabbit") public class RabbitTest {@Autowiredprivate DeadLetterSender deadLetterSender;@GetMapping("/deadTest")public void deadTest() {deadLetterSender.send("隊列設置過期時間測試");}}啟動項目開始測試
發送端和接收端時間間隔3秒(毫秒差就不要較真了⊙﹏⊙|||)
?
設置消息過期時間實現延時消費
?
還是先貼上配置的代碼,基本配置都一樣,唯一的區別是deadLetterQueue的過期時間這里不做配置,需要注意的是,因為我這里用的是同一個隊列名,所以即使將隊列過期時間配置刪除,mq中該隊列過期時間仍然還是存在的,所以需要刪除該隊列,啟動項目時才能重新配置該隊列屬性,可能可以通過配置的方式重新覆蓋屬性配置,小白沒研究出來(?_?),當然也可以保留隊列過期時間的配置,當兩個過期時間都存在時,消息取更小的過期時間。
import java.util.HashMap;@SpringBootApplication public class Application {//用于死信隊列轉發的交換機@BeanDirectExchange exchange() {return new DirectExchange("exchange");}//用于延時消費的隊列@Beanpublic Queue repeatTradeQueue() {Queue queue = new Queue("repeatTradeQueue",true,false,false);return queue; }//綁定交換機并指定routing key@Beanpublic Binding repeatTradeBinding() {return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue");}//配置死信隊列@Beanpublic Queue deadLetterQueue() {Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "exchange");args.put("x-dead-letter-routing-key", "repeatTradeQueue");return new Queue("deadLetterQueue", true, false, false, args);}public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);} }?
配置生產者,message的expiration就是過期時間的設置,單位也是毫秒
import java.time.LocalDateTime;import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class DeadLetterSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg, long times) {System.out.println("DeadLetterSender 發送時間:" + LocalDateTime.now().toString() + " msg內容:" + msg);MessagePostProcessor processor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(times + "");return message;}};rabbitTemplate.convertAndSend("deadLetterQueue", (Object)msg, processor);} }消費者不變,用之前的類即可
稍微修改一下接口,設置時間為5秒
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import me.miaobo.mq.sender.DeadLetterSender;@RestController @RequestMapping("/rabbit") public class RabbitTest {@Autowiredprivate DeadLetterSender deadLetterSender;@GetMapping("/deadTest")public void deadTest() {deadLetterSender.send("消息設置過期時間測試",5000);} }補充一下隊列的刪除,在控制臺選擇queues菜單,找到我們配置的隊列,點擊名稱進詳情,操作介紹有點傻,不清楚mq的可以看看之前的鏈接貼。
Mac 安裝 rabbitmq
?
?
進入詳情界面可以看到之前的配置,過期時間3秒,自己通過項目重啟發現過期時間并不會刪除,只好在管理界面手動刪除。
?
?
下拉詳情頁面,找到刪除按鈕,刪除該隊列
?
?
啟動服務,測試接口
時間相隔5秒,和接口設置的時間保持一致。
?
為了驗證隊列和消息過期時間同時配置時取最小值,我又刪除了隊列一遍,把隊列3秒過期時間加上。
繼續測試 (●゚ω゚●),還是用之前的接口測試。
不再是接口設置的5秒過期時間,而是隊列設置的3秒過期時間生效。
實現二:插件實現
?
rabbitmq 安裝rabbitmq_delayed_message_exchange插件(版本要求網上說3.5.7以上,未驗證低版本問題,安裝的3.7.7版本)
插件下載地址
插件包放在rabbit安裝目錄/plugins/菜單下
運行命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安裝Erlang(版本要求網上說18.0以上,未驗證低版本問題,homebrew自動安裝的20.3.8.2版本)
mac通過brew命令安裝
brew install erlang
windows的沒研究,可以自己百度下(~ ̄▽ ̄)~
?
插件實現延時消費時踩了個坑,就是插件包版本有問題,自己在網上下載別人分享的包,結果測試一直不通過,折騰了挺久沒找到原因,懷疑是插件包的問題,就去官網下載了對應3.7.7版本的插件包。
?
安裝完成后去管理頁面驗證插件是否有效
?
插件若安裝成功,type類型會有多一個x-delayed-message,rabbitmq默認是沒有的,add一個exchange,選擇該類型,添加成功說明插件啟用成功。我是沒測試add,結果直接去寫demo測試了,結果一直啟動報錯還不知道原因,后來發現是插件包的問題/(ㄒoㄒ)/~~。
?
接著就是延時隊列的配置
注意delayedExchange是用的自定義類型,類型為x-delayed-message,x-delayed-type要求是rabbitmq的類型,取direct,topic,fanout,headers類型中的一個,x-delayed-type說實話不知道具體干嘛用的,就這么照著配置了 ?( ̄3 ̄)。
import java.util.HashMap;@SpringBootApplication public class Application {/*** 自定義的交換機類型* @return*/@BeanCustomExchange delayedExchange() {Map<String,Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayedExchange","x-delayed-message",true,false,args);}/*** 創建一個隊列* @return*/@Beanpublic Queue delayedQueue() {return new Queue("delayedQueue",true);}/** * 綁定隊列到自定義交換機 * @return */ @Bean public Binding bindingNotify() { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayedQueue").noargs(); } public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);} }配置生產者,這里對message的header信息進行配置,配置的x-delay參數就是延時時間,單位也是毫秒,指定綁定的交換機名稱以及routing key
import java.time.LocalDateTime;import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg, long time) {System.out.println("DelayedSender 發送時間: " + LocalDateTime.now() + " msg內容:" + msg);MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", time);return message;}};this.rabbitTemplate.convertAndSend("delayedExchange", "delayedQueue", msg, messagePostProcessor);}}配置消費者,監聽對應的隊列名稱delayedQueue
import java.time.LocalDateTime;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "delayedQueue") public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {System.out.println("DelayedReceiver 接受時間: " + LocalDateTime.now() + " msg內容:" + msg);} }?
寫接口測試
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import me.miaobo.mq.sender.DelayedSender;@RestController @RequestMapping("/rabbit") public class RabbitTest {@Autowiredprivate DelayedSender delayedSender;@GetMapping("/delayedTest")public void delayedTest() {delayedSender.send("插件實現延時隊列",6000);}}?
啟動項目測試,延遲時間為接口設置的6秒。
?
以上為自己對mq延時消費機制的簡單實現,分享給初學mq的程序猿們,學習記錄,僅供參考,歡迎評論區補充指點!
總結
以上是生活随笔為你收集整理的spring boot rabbitmq 延时消费的简单实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【源码】Viewer3D工具包
- 下一篇: Aplication简单使用示例