RabbitMQ如何实现延迟队列?
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-how-to-make-delay-queue/
什么是延遲隊列
延遲隊列存儲的對象肯定是對應的延遲消息,所謂”延遲消息”是指當消息被發送以后,并不想讓消費者立即拿到消息,而是等待指定時間后,消費者才拿到這個消息進行消費。
場景一:在訂單系統中,一個用戶下單之后通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那么這個訂單將進行一場處理。這是就可以使用延遲隊列將訂單信息發送到延遲隊列。
場景二:用戶希望通過手機遠程遙控家里的智能設備在指定的時間進行工作。這時候就可以將用戶指令發送到延遲隊列,當指令設定的時間到了再將指令推送到只能設備。
RabbitMQ怎么實現延遲隊列
AMQP協議,以及RabbitMQ本身沒有直接支持延遲隊列的功能,但是可以通過TTL和DLX模擬出延遲隊列的功能。
TTL(Time To Live)
RabbitMQ可以針對Queue和Message設置 x-message-tt,來控制消息的生存時間,如果超時,則消息變為dead letter
RabbitMQ針對隊列中的消息過期時間有兩種方法可以設置。
- A: 通過隊列屬性設置,隊列中所有消息都有相同的過期時間。
- B: 對消息進行單獨設置,每條消息TTL可以不同。
如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead letter
詳細可以參考:RabbitMQ之TTL(Time-To-Live 過期時間)
DLX (Dead-Letter-Exchange)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由。
- x-dead-letter-exchange:出現dead letter之后將dead letter重新發送到指定exchange
- x-dead-letter-routing-key:指定routing-key發送
隊列出現dead letter的情況有:
- 消息或者隊列的TTL過期
- 隊列達到最大長度
- 消息被消費端拒絕(basic.reject or basic.nack)并且requeue=false
利用DLX,當消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange。這時候消息就可以重新被消費。
詳細可以參考: RabbitMQ之死信隊列
代碼示例
首先建立2個exchange和2個queue:
- exchange_delay_begin:這個是producer端發送時調用的exchange, 將消息發送至queue_dealy_begin中。
- queue_delay_begin: 通過routingKey="delay"綁定exchang_delay_begin, 同時配置DLX=exchange_delay_done, 當消息變成死信時,發往exchange_delay_done中。
- exchange_delay_done: 死信的exchange, 如果不配置x-dead-letter-routing-key則采用原有默認的routingKey,即queue_delay_begin綁定exchang_delay_beghin采用的“delay”。
- queue_delay_done:消息在TTL到期之后,最終通過exchang_delay_done發送值此queue,消費端通過消費此queue的消息,即可以達到延遲的效果。
1. 建立exchange和queue的代碼(當然這里可以通過RabbitMQ的管理界面來實現,無需code相關代碼):
channel.exchangeDeclare("exchange_delay_begin", "direct", true); channel.exchangeDeclare("exchange_delay_done", "direct", true);Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "exchange_delay_done"); channel.queueDeclare("queue_delay_begin", true, false, false, args); channel.queueDeclare("queue_delay_done", true, false, false, null);channel.queueBind("queue_delay_begin", "exchange_delay_begin", "delay"); channel.queueBind("queue_delay_done", "exchange_delay_done", "delay");2. consumer端代碼:
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("queue_delay_done", false, consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("receive msg time:" + new Date() + ", msg body:" + msg);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }3. producer端代碼:設置消息的延遲時間為1min。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("60000");//設置消息TTL builder.deliveryMode(2);//設置消息持久化 AMQP.BasicProperties properties = builder.build();String message = String.valueOf(new Date()); channel.basicPublish("exchange_delay_begin","delay",properties,message.getBytes());在創建完exchange和queue之后,首先執行consumer端的代碼,之后執行producer端的代碼,待producer發送完畢之后,查看consumer端的輸出:
receive msg time:Tue Feb 14 21:06:19 CST 2017, msg body:Tue Feb 14 21:05:19 CST 2017可以看到延遲1min消費了相關消息。大功告成~
欲了解更多消息中間件的內容,可以關注:消息中間件收錄集
參考資料
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-how-to-make-delay-queue/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生
總結
以上是生活随笔為你收集整理的RabbitMQ如何实现延迟队列?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ之队列优先级
- 下一篇: RabbitMQ基础概念详解