日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ如何实现延迟队列?

發布時間:2024/4/11 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ如何实现延迟队列? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-how-to-make-delay-queue/

什么是延遲隊列

延遲隊列存儲的對象肯定是對應的延遲消息,所謂”延遲消息”是指當消息被發送以后,并不想讓消費者立即拿到消息,而是等待指定時間后,消費者才拿到這個消息進行消費。

場景一:在訂單系統中,一個用戶下單之后通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那么這個訂單將進行一場處理。這是就可以使用延遲隊列將訂單信息發送到延遲隊列。

場景二:用戶希望通過手機遠程遙控家里的智能設備在指定的時間進行工作。這時候就可以將用戶指令發送到延遲隊列,當指令設定的時間到了再將指令推送到只能設備。


RabbitMQ怎么實現延遲隊列

AMQP協議,以及RabbitMQ本身沒有直接支持延遲隊列的功能,但是可以通過TTL和DLX模擬出延遲隊列的功能。

TTL(Time To Live)
RabbitMQ可以針對Queue和Message設置 x-message-tt,來控制消息的生存時間,如果超時,則消息變為dead letter
RabbitMQ針對隊列中的消息過期時間有兩種方法可以設置。

  • A: 通過隊列屬性設置,隊列中所有消息都有相同的過期時間。
  • B: 對消息進行單獨設置,每條消息TTL可以不同。

如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead letter

詳細可以參考:RabbitMQ之TTL(Time-To-Live 過期時間)

DLX (Dead-Letter-Exchange)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由。

  • x-dead-letter-exchange:出現dead letter之后將dead letter重新發送到指定exchange
  • x-dead-letter-routing-key:指定routing-key發送

隊列出現dead letter的情況有:

  • 消息或者隊列的TTL過期
  • 隊列達到最大長度
  • 消息被消費端拒絕(basic.reject or basic.nack)并且requeue=false

利用DLX,當消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange。這時候消息就可以重新被消費。

詳細可以參考: RabbitMQ之死信隊列


代碼示例

首先建立2個exchange和2個queue:

  • exchange_delay_begin:這個是producer端發送時調用的exchange, 將消息發送至queue_dealy_begin中。
  • queue_delay_begin: 通過routingKey="delay"綁定exchang_delay_begin, 同時配置DLX=exchange_delay_done, 當消息變成死信時,發往exchange_delay_done中。
  • exchange_delay_done: 死信的exchange, 如果不配置x-dead-letter-routing-key則采用原有默認的routingKey,即queue_delay_begin綁定exchang_delay_beghin采用的“delay”。
  • queue_delay_done:消息在TTL到期之后,最終通過exchang_delay_done發送值此queue,消費端通過消費此queue的消息,即可以達到延遲的效果。

1. 建立exchange和queue的代碼(當然這里可以通過RabbitMQ的管理界面來實現,無需code相關代碼):

channel.exchangeDeclare("exchange_delay_begin", "direct", true); channel.exchangeDeclare("exchange_delay_done", "direct", true);Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "exchange_delay_done"); channel.queueDeclare("queue_delay_begin", true, false, false, args); channel.queueDeclare("queue_delay_done", true, false, false, null);channel.queueBind("queue_delay_begin", "exchange_delay_begin", "delay"); channel.queueBind("queue_delay_done", "exchange_delay_done", "delay");

2. consumer端代碼:

QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("queue_delay_done", false, consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("receive msg time:" + new Date() + ", msg body:" + msg);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }

3. producer端代碼:設置消息的延遲時間為1min。

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("60000");//設置消息TTL builder.deliveryMode(2);//設置消息持久化 AMQP.BasicProperties properties = builder.build();String message = String.valueOf(new Date()); channel.basicPublish("exchange_delay_begin","delay",properties,message.getBytes());

在創建完exchange和queue之后,首先執行consumer端的代碼,之后執行producer端的代碼,待producer發送完畢之后,查看consumer端的輸出:

receive msg time:Tue Feb 14 21:06:19 CST 2017, msg body:Tue Feb 14 21:05:19 CST 2017

可以看到延遲1min消費了相關消息。大功告成~

欲了解更多消息中間件的內容,可以關注:消息中間件收錄集


參考資料

  • rabbitmq 實現延遲隊列的兩種方式
  • RabbitMQ之TTL(Time-To-Live 過期時間)
  • RabbitMQ之死信隊列
  • 歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-how-to-make-delay-queue/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

    總結

    以上是生活随笔為你收集整理的RabbitMQ如何实现延迟队列?的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。