日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rabbitMq实现延迟队列

發布時間:2023/12/20 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitMq实现延迟队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 業務場景:
  • 1 安裝rabbitMq
  • 2 添加maven依賴
  • 3 在application.properties配置
  • 4 具體的實現
    • 4.1 Dead Letter Exchanges
    • 4.2 實現延遲隊列
      • 代碼
    • 4.3 RabbitMq的優化
    • 4.4 利用插件實現延遲隊列
      • 4.4.1 下載插件
      • 4.4.2 編寫代碼
  • 5.總結

業務場景:

1.生成訂單30分鐘未支付,則自動取消,我們該怎么實現呢?
2.生成訂單60秒后,給用戶發短信

1 安裝rabbitMq

windows安裝
ubuntu中安裝

2 添加maven依賴

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3 在application.properties配置

spring.application.name=rabbitmq-hello # 配置rabbbitMq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=springCloud spring.rabbitmq.password=123456

4 具體的實現

rabbitmq本身是沒有延遲發送的功能,但是我們通過消息的TTL(Time To Live)來實現,所謂TTL就是指消息的存活時間,RabbitMQ可以對隊列和消息分別設置TTL。
對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。
我們可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。只是expiration字段是字符串參數,所以要寫個int類型的字符串:

byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60*1000"); channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

當上面的消息扔到隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。
這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統計到隊列的消息數中去。單靠死信還不能實現延遲任務,還要靠Dead Letter Exchange
下面我大致解釋一下Dead Letter Exchanges

4.1 Dead Letter Exchanges

一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。

  • 1.一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
  • 2.上面的消息的TTL到了,消息過期了。
  • 3.隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

Dead Letter Exchange其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。

4.2 實現延遲隊列

延遲任務通過消息的TTL和Dead Letter Exchange來實現。我們需要建立2個隊列,一個用于發送消息,一個用于消息過期后的轉發目標隊列,大致原理如下圖所示。

生產者輸出消息到Queue1,并且這個消息是設置有有效時間的,比如60s。消息會在Queue1中等待60s,如果沒有消費者收掉的話,它就是被轉發到Queue2,Queue2有消費者,收到,處理延遲任務。
接下來正式進入代碼階段

代碼

  • 聲明交換機、隊列以及他們的綁定關系:
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;@Configuration public class RabbitMQConfig {public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";//普通的交換機public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea";//聲明兩個隊列 A Bpublic static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb";public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";//Dead Letter Exchangespublic static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";//死信交換機public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea";public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb";// 聲明延時Exchange@Bean("delayExchange")public DirectExchange delayExchange() {return new DirectExchange(DELAY_EXCHANGE_NAME);}// 聲明死信Exchange@Bean("deadLetterExchange")public DirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 聲明延時隊列A 延時10s// 并綁定到對應的死信交換機@Bean("delayQueueA")public Queue delayQueueA() {Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key 這里聲明當前隊列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);// x-message-ttl 聲明隊列的TTLargs.put("x-message-ttl", 1000 * 10);return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();}// 聲明延時隊列B 延時 60s// 并綁定到對應的死信交換機@Bean("delayQueueB")public Queue delayQueueB() {Map<String, Object> args = new HashMap<>(2);// x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key 這里聲明當前隊列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);// x-message-ttl 聲明隊列的TTLargs.put("x-message-ttl", 60000);return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();}// 聲明死信隊列A 用于接收延時10s處理的消息@Bean("deadLetterQueueA")public Queue deadLetterQueueA() {return new Queue(DEAD_LETTER_QUEUEA_NAME);}// 聲明死信隊列B 用于接收延時60s處理的消息@Bean("deadLetterQueueB")public Queue deadLetterQueueB() {return new Queue(DEAD_LETTER_QUEUEB_NAME);}// 聲明延時隊列A綁定關系@Beanpublic Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,@Qualifier("delayExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);}// 聲明業務隊列B綁定關系@Beanpublic Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);}// 聲明死信隊列A綁定關系@Beanpublic Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);}// 聲明死信隊列B綁定關系@Beanpublic Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);} }
  • 消息的生產者
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service;import static com.talent.infocenter.rabbitmq.RabbitMQConfig.*;@Service public class DelayMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public enum DelayTypeEnum {DELAY_10s, DELAY_60s;}public static DelayTypeEnum getByIntValue(int value) {switch (value) {case 10:return DelayTypeEnum.DELAY_10s;case 60:return DelayTypeEnum.DELAY_60s;default:return null;}}public void sendMsg(String msg, DelayTypeEnum type) {switch (type) {case DELAY_10s:rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);break;case DELAY_60s:rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);break;}} }
  • 消費者
    我們創建兩個消費者,分別消費10s和60s的訂單
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.Date;import static com.talent.infocenter.rabbitmq.RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME; import static com.talent.infocenter.rabbitmq.RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME;@Slf4j @Component public class DeadLetterQueueConsumer {@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)public void receiveA(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("當前時間:{},死信隊列A收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)public void receiveB(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("當前時間:{},死信隊列B收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }
  • 創建一個接口進行測試
import com.talent.infocenter.rabbitmq.DelayMessageSender; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.Objects;@Slf4j @RestController public class RabbitMQMsgController {@Autowiredprivate DelayMessageSender sender;@RequestMapping(value = "sendmsg", method = RequestMethod.GET)public void sendMsg(@RequestParam(value = "msg") String msg, @RequestParam(value = "delayType") Integer delayType) {log.info("當前時間:{},收到請求,msg:{},delayType:{}", new Date(), msg, delayType);sender.sendMsg(msg, Objects.requireNonNull(DelayMessageSender.getByIntValue(delayType)));} }

接下來開始測試,我用的是swagger,大家可以用postman等其他方法自行測試

打開我們的rabbitmq后臺就可以看到我們交換機和隊列信息

同樣的方法,我們創建一個60s之后才能消費的訂單

上面的實現僅能設置兩個指定的時間10s和60s,接下來我們設置任意時間的延遲隊列

4.3 RabbitMq的優化

我們需要一種更通用的方案才能滿足需求,那么就只能將TTL設置在消息屬性里了,只有如此我們才能更加靈活的實現延遲隊列的具體業務開發,方法也很簡單,我們只需要增加一個延時隊列,用于接收設置為任意延時時長的消息,同時增加一個相應的死信隊列和routingkey,但是該方法有個極大的弊端就是如果使用在消息屬性上設置TTL的方式,消息可能并不會按時“死亡“,因為RabbitMQ只會檢查第一個消息是否過期,如果過期則丟到死信隊列,所以如果第一個消息的延時時長很長,而第二個消息的延時時長很短,則第二個消息并不會優先得到執行,此處則不再進行編寫代碼,但是為了解決這個問題,我們將利用rabbitMq插件實現延遲隊列。

4.4 利用插件實現延遲隊列

4.4.1 下載插件

點擊此處下載

下載完成之后進行解壓,此處推薦bandzip進行解壓,并且將解壓之后的文件夾放到rabbitmq的安裝目錄下的plugins目錄下

進入到sbin目錄下使用cmd執行以下指令來啟用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

執行以上步驟之后開始重啟我們的rabbitmq服務

  • 1.進入到服務

  • 2.進入到sbin目錄,雙擊rabbitmq-server.bat

驗證是否重啟成功訪問http://localhost:15672
如果能夠訪問成功說明重啟成功

4.4.2 編寫代碼

重新創建一個配置類

import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;@Configuration public class DelayedRabbitMQConfig {public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";@Beanpublic Queue immediateQueue() {return new Queue(DELAYED_QUEUE_NAME);}@Beanpublic CustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,@Qualifier("customExchange") CustomExchange customExchange) {return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();} }

新建一個消息的發送者

import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import static com.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_EXCHANGE_NAME; import static com.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_ROUTING_KEY;@Service public class Provider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMsg(String msg, Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a -> {a.getMessageProperties().setDelay(delayTime*1000);return a;});} }

新建一個消息的消費者

import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.Date;import static com.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_QUEUE_NAME;@Slf4j @Component public class Consumer {@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("當前時間:{},延時隊列收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }

修改我們之前的接口

import com.talent.api.utils.RedisUtils; import com.talent.infocenter.rabbitMq.Provider; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.Objects;@Slf4j @RestController public class RabbitMQMsgController {@Autowiredprivate Provider provider;@RequestMapping(value = "sendmsg", method = RequestMethod.GET)public void sendMsg(@RequestParam(value = "msg") String msg, @RequestParam(value = "delayTime") Integer delayTime) {log.info("當前時間:{},收到請求,msg:{},delayTime:{}", new Date(), msg, delayTime);provider.sendDelayMsg(msg, delayTime);} }

接下來開始測試

再接著測試一下我們不同順序的訂單是否是按照時間順序進行消費的
我們將訂單0002設置為60s,訂單0003設置為15s,看看訂單0003能否在0002之前消費

結果顯而易見訂單0003確實在第15s的時候被消費掉

5.總結

延時隊列在需要延時處理的場景下非常有用,而且十分穩定,使用RabbitMQ來實現延時隊列可以很好的利用RabbitMQ的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過RabbitMQ集群的特性,可以很好的解決單點故障問題,不會因為單個節點掛掉導致延時隊列不可用或者消息丟失。
當然,延時隊列還有很多其它選擇,比如利用Java的DelayQueu,利用Redis的zset,利用Quartz或者利用kafka的時間輪,這些方式各有特點。

總結

以上是生活随笔為你收集整理的rabbitMq实现延迟队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。