日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ(五)死信队列和延迟队列

發布時間:2023/12/20 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ(五)死信队列和延迟队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.1 概念

先從概念解釋上搞清楚這個定義,死信,顧名思義就是無法被消費的消息,字面意思可以這樣理解,一般來說,producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進行消費,但某些時候由于特定的原因導致 queue 中的某些消息無法被消費,這樣的消息如果沒有后續的處理,就變成了死信,有死信自然就有了死信隊列。

應用場景:為了保證訂單業務的消息數據不丟失,需要使用到 RabbitMQ 的死信隊列機制,當消息消費發生異常時,將消息投入死信隊列中.還有比如說: 用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效。

1.2 死信的來源

  • 消息 TTL 過期

  • 隊列達到最大長度(隊列滿了,無法再添加數據到 mq 中)

  • 消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false

1.3 死信實戰

1.3.1 代碼架構圖

1.3.2 ttl過期

生產者代碼

public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange"; ?public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//設置消息的 TTL 時間AMQP.BasicProperties properties = newAMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,message.getBytes());System.out.println("生產者發送消息:" + message);}}} }

消費者代碼

public class Consumer01 {//普通交換機名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機名稱private static final String DEAD_EXCHANGE = "dead_exchange"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明死信和普通交換機 類型為 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//聲明死信隊列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信隊列綁定死信交換機與 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常隊列綁定死信隊列信息Map<String, Object> params = new HashMap<>();//正常隊列設置死信交換機 參數 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常隊列設置死信 routing-key 參數 key 是固定值params.put("x-dead-letter-routing-key", "lisi"); ?String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});} }

死信消費者

public class Consumer02 {private static final String DEAD_EXCHANGE = "dead_exchange"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信隊列消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Consumer02 接收死信隊列的消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});} }

1.3.3 隊列達到最大長度

1)生產者發送消息去掉ttl屬性

2)普通消費者增加如下屬性

params.put("x-max-length",6);

此時我們發送10條,就會有4條消息被發送到死信隊列。

重啟Consumer01,另外6條也會被消費。

1.3.4 消息被拒

public class Consumer03 {//普通交換機名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機名稱private static final String DEAD_EXCHANGE = "dead_exchange"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明死信和普通交換機 類型為 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//聲明死信隊列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信隊列綁定死信交換機與 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常隊列綁定死信隊列信息Map<String, Object> params = new HashMap<>();//正常隊列設置死信交換機 參數 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常隊列設置死信 routing-key 參數 key 是固定值params.put("x-dead-letter-routing-key", "lisi");String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);if (message.equals("info5")) {System.out.println("Consumer01 接收到消息" + message + "并拒絕簽收該消息");//requeue 設置為 false 代表拒絕重新入隊 該隊列如果配置了死信交換機將發送到死信隊列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);} else {System.out.println("Consumer01 接收到消息" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};boolean autoAck = false;channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {});} }

2. 延遲隊列

2.1概念

延時隊列,隊列內部是有序的,最重要的特性就體現在它的延時屬性上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。

2.2 使用場景

1.訂單在十分鐘之內未支付則自動取消。

2.新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。

3.用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。

4.用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。

5.預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議。

總結

以上是生活随笔為你收集整理的RabbitMQ(五)死信队列和延迟队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。