日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

SpringBoot之使用RabbitMQ实现延迟队列

發(fā)布時(shí)間:2025/3/20 javascript 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringBoot之使用RabbitMQ实现延迟队列 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在我們的各個(gè)項(xiàng)目中,經(jīng)常會(huì)有這樣的需求。

  • 訂單模塊:在訂單下單后30分鐘如果沒有付款,就自動(dòng)取消訂單,

  • 短信模塊:在下單成功后60s給用戶發(fā)送短信通知

  • 支付模塊:在微信/支付寶支付成功后,1分鐘后去調(diào)用上游接口檢查訂單有沒有支付成功

實(shí)現(xiàn)這種需求的方式有幾種。

一種比較笨的方式是采用定時(shí)任務(wù),輪訓(xùn)數(shù)據(jù)庫,方法簡(jiǎn)單好用,但性能低下,在高并發(fā)情況下容易弄死數(shù)據(jù)庫,間隔時(shí)間不好設(shè)置,時(shí)間過大,影響精度,過小影響性能,而且做不到按超時(shí)的時(shí)間順序處理。

還有一種是利用redis監(jiān)聽過期key,給key設(shè)置過期時(shí)間,在key失效以后處理失效后的邏輯實(shí)現(xiàn)上述需求。這種實(shí)現(xiàn)有一個(gè)問題就是當(dāng)項(xiàng)目是單機(jī)項(xiàng)目,在項(xiàng)目部署的時(shí)候,剛好redis的key失效了,就會(huì)存在消息丟失。

rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列:

`RabbitMQ隊(duì)列本身是沒有直接實(shí)現(xiàn)支持延遲隊(duì)列的功能,但可以通過它的Time-To-Live Extensions 與 Dead Letter Exchange 的特性模擬出延遲隊(duì)列的功能。

Time-To-Live Extensions

RabbitMQ支持為隊(duì)列或者消息設(shè)置TTL(time to live 存活時(shí)間)。TTL表明了一條消息可在隊(duì)列中存活的最大時(shí)間。當(dāng)某條消息被設(shè)置了TTL或者當(dāng)某條消息進(jìn)入了設(shè)置了TTL的隊(duì)列時(shí),這條消息會(huì)在TTL時(shí)間后死亡成為Dead Letter。如果既配置了消息的TTL,又配置了隊(duì)列的TTL,那么較小的那個(gè)值會(huì)被取用。

Dead Letter Exchange

死信交換機(jī),上文中提到設(shè)置了 TTL 的消息或隊(duì)列最終會(huì)成為Dead Letter。如果為隊(duì)列設(shè)置了Dead Letter Exchange(DLX),那么這些Dead Letter就會(huì)被重新發(fā)送到Dead Letter Exchange中,然后通過Dead Letter Exchange路由到其他隊(duì)列,即可實(shí)現(xiàn)延遲隊(duì)列的功能。

SpringBoot中實(shí)現(xiàn)RabbitMQ延時(shí)隊(duì)列

1.導(dǎo)入依賴

? ? ? ?<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

2.配置文件

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

3.隊(duì)列配置類

@Configuration @Slf4j public class DelayRabbitConfig {/*** 延遲隊(duì)列 TTL 名稱*/private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";/*** DLX,dead letter發(fā)送到的exchange* 延時(shí)消息就是發(fā)送到該交換機(jī)的*/public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";/*** routing key 名稱* 具體消息發(fā)送在該 routingKey 的*/public static final String ORDER_DELAY_ROUTING_KEY = "order_delay"; ?public static final String ORDER_QUEUE_NAME = "user.order.queue";public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";public static final String ORDER_ROUTING_KEY = "order"; ?/*** 延遲隊(duì)列配置* <p>* 1、params.put("x-message-ttl", 5 * 1000);* 第一種方式是直接設(shè)置 Queue 延遲時(shí)間 但如果直接給隊(duì)列設(shè)置過期時(shí)間,這種做法不是很靈活,(當(dāng)然二者是兼容的,默認(rèn)是時(shí)間小的優(yōu)先)* 2、rabbitTemplate.convertAndSend(book, message -> {* message.getMessageProperties().setExpiration(2 * 1000 + "");* return message;* });* 第二種就是每次發(fā)送消息動(dòng)態(tài)設(shè)置延遲時(shí)間,這樣我們可以靈活控制**/@Beanpublic Queue delayOrderQueue() {Map<String, Object> params = new HashMap<>(2);// x-dead-letter-exchange 聲明了隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱,params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);// x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時(shí)攜帶的 routing-key 名稱。params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);} ?/*** 需要將一個(gè)隊(duì)列綁定到交換機(jī)上,要求該消息與一個(gè)特定的路由鍵完全匹配。* 這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā),* 不會(huì)轉(zhuǎn)發(fā)dog.puppy,也不會(huì)轉(zhuǎn)發(fā)dog.guard,只會(huì)轉(zhuǎn)發(fā)dog。** @return DirectExchange*/@Beanpublic DirectExchange orderDelayExchange() {return new DirectExchange(ORDER_DELAY_EXCHANGE);} ?@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);} ?@Beanpublic Queue orderQueue() {return new Queue(ORDER_QUEUE_NAME, true);} ?/*** 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。* 符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“*”匹配不多不少一個(gè)詞。* 因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會(huì)匹配到“audit.irs”。**/@Beanpublic TopicExchange orderTopicExchange() {return new TopicExchange(ORDER_EXCHANGE_NAME);} ?@Beanpublic Binding orderBinding() {// TODO 如果要讓延遲隊(duì)列之間有關(guān)聯(lián),這里的 routingKey 和 綁定的交換機(jī)很關(guān)鍵return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);} ? }

4.訂單實(shí)體

@Data public class Order implements Serializable {private String orderId;private String name;/*** 訂單狀態(tài):0-未支付 1-已支付 2-訂單已取消*/private Integer orderStatus; } ?

5.消息發(fā)送者

@Component @Slf4j public class DelaySender { ?@Autowiredprivate AmqpTemplate amqpTemplate; ?public void sendDelay(Order order) {log.info("【訂單生成時(shí)間】" + new Date().toString() + "【1分鐘后檢查訂單是否已經(jīng)支付】" + order.toString());this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {// 如果配置了 params.put("x-message-ttl", 5 * 1000);// 那么這一句也可以省略,具體根據(jù)業(yè)務(wù)需要是聲明 Queue 的時(shí)候就指定好延遲時(shí)間還是在發(fā)送自己控制時(shí)間message.getMessageProperties().setExpiration(1000 * 60 + "");return message;});} } ?

6.消息接收者

@Component @Slf4j public class DelayReceiver { ?@RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})public void orderDelayQueue(Order order, Message message, Channel channel) {log.info("---------------------------------------------");log.info("【orderDelayQueue 監(jiān)聽的消息】 - 【消費(fèi)時(shí)間】 - [{}]- 【訂單內(nèi)容】 - [{}]", new Date(), order.toString());if (order.getOrderStatus() == 0) {order.setOrderStatus(2);log.info("【該訂單未支付,取消訂單】" + order.toString());} else if (order.getOrderStatus() == 1) {log.info("【該訂單已完成支付】");} else if (order.getOrderStatus() == 2) {log.info("【該訂單已取消】");}log.info("---------------------------------------------"); ?} }

7.測(cè)試

@RestController public class TestController {@Autowiredprivate DelaySender delaySender; ?@GetMapping("/sendDelay")public Object sendDelay() {Order order1 = new Order();order1.setOrderStatus(0);order1.setOrderId("2019110874129");order1.setName("switch"); ?Order order2 = new Order();order2.setOrderStatus(1);order2.setOrderId("2019110874215");order2.setName("PS4 pro"); ?delaySender.sendDelay(order1);delaySender.sendDelay(order2);return "success";} ? }

測(cè)試結(jié)果如下:

2019-11-08 14:29:01.544 INFO 44447 --- [nio-8080-exec-1] c.y.study.rabbitmqdeadqueue.DelaySender : 【訂單生成時(shí)間】Fri Nov 08 14:29:01 CST 2019【1分鐘后檢查訂單是否已經(jīng)支付】Order(orderId=2019110874129, orderStatus=0, orderName=switch) 2019-11-08 14:29:01.689 INFO 44447 --- [nio-8080-exec-1] c.y.study.rabbitmqdeadqueue.DelaySender : 【訂單生成時(shí)間】Fri Nov 08 14:29:01 CST 2019【1分鐘后檢查訂單是否已經(jīng)支付】Order(orderId=2019110874215, orderStatus=1, orderName=PS4 pro) 2019-11-08 14:30:01.723 INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver ? : --------------------------------------------- 2019-11-08 14:30:01.724 INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver ? : 【orderDelayQueue 監(jiān)聽的消息】 - 【消費(fèi)時(shí)間】 - [Fri Nov 08 14:30:01 CST 2019]- 【訂單內(nèi)容】 - [Order(orderId=2019110874129, orderStatus=0, orderName=switch)] 2019-11-08 14:30:01.727 INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver ? : 【該訂單未支付,取消訂單】Order(orderId=2019110874129, orderStatus=2, orderName=switch) 2019-11-08 14:30:01.727 INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver ? : --------------------------------------------- 2019-11-08 14:30:01.728 INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver ? : --------------------------------------------- 2019-11-08 14:30:01.729 INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver ? : 【orderDelayQueue 監(jiān)聽的消息】 - 【消費(fèi)時(shí)間】 - [Fri Nov 08 14:30:01 CST 2019]- 【訂單內(nèi)容】 - [Order(orderId=2019110874215, orderStatus=1, orderName=PS4 pro)] 2019-11-08 14:30:01.729 INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver ? : 【該訂單已完成支付】 2019-11-08 14:30:01.729 INFO 44447 --- [ntContainer#0-1] c.y.s.rabbitmqdeadqueue.DelayReceiver ? : ---------------------------------------------

總結(jié)

以上是生活随笔為你收集整理的SpringBoot之使用RabbitMQ实现延迟队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。