日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

rabbitmq的死信队列(四)

發布時間:2023/12/15 综合教程 25 生活家
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq的死信队列(四) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

死信隊列

死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message后,可以被重新發送到另一個交換機,這個交換機就是DLX。

消息成為死信的三種情況:

  1. 隊列消息長度到達限制;

  2. 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;

  3. 原隊列存在消息過期設置,消息到達超時時間未被消費;

隊列綁定死信交換機:

給隊列設置參數: x-dead-letter-exchange 和 x-dead-letter-routing-key和x-message-ttl和x-max-length

    x-dead-letter-exchange:綁定的死信交換機名稱

   x-dead-letter-routing-key:綁定正常隊列和死信交換機的路由

   x-dead-letter-routing-key:ttl過期時間

   x-max-length:設置正常隊列長度限制

rabbitmq-high-producer項目

application.properties文件

server.port=8081
# ip
spring.rabbitmq.host=127.0.0.1
#默認5672
spring.rabbitmq.port=5672
#用戶名
spring.rabbitmq.username=guest
#密碼
spring.rabbitmq.password=guest
#連接到代理時用的虛擬主機
spring.rabbitmq.virtual-host=/
#是否啟用【發布確認】,默認false
#spring.rabbitmq.publisher-confirm-type=correlated替換spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
#是否啟用【發布返回】,默認false
spring.rabbitmq.publisher-returns=true
#表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#rabbitmq限流,必須在ack確認才能使用
#消費者最小數量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消費者數量
spring.rabbitmq.listener.simple.max-concurrency=10
#在單個請求中處理的消息個數,他應該大于等于事務數量(unack的最大數量)
spring.rabbitmq.listener.simple.prefetch=2
        
DlxQueueRabbitConfig類
package com.qingfeng.rabbitmqhighproducer.dlx.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信隊列
 */
@Configuration
public class DlxQueueRabbitConfig {

    //正常隊列名稱
    public static final String NORMAL_DLX_QUEUE = "normal_dlx_queue";
    //正常交換機名稱
    public static final String NORMAL_DLX_Exchange = "normal_dlx_exchange";

    //ttl過期時間毫秒
    private static final int NORMAL_DLX_EXPIRATION = 10000;

    //設置正常隊列長度限制
    private static final int NORMAL_DLX_LENGTH = 10;

    //死信隊列名稱
    public static final String DLX_QUEUE = "dlx_queue";
    //死信交換機名稱
    public static final String DLX_Exchange = "dlx_exchange";


    //聲明正常交換機
    @Bean("normalDlxExchange")
    public TopicExchange normalDlxExchange(){
        return new TopicExchange(NORMAL_DLX_Exchange);
    }

    //聲明正常隊列綁定死信隊列的交換機
    @Bean("normalDlxQueue")
    public Queue normalDlxQueue(){
        return QueueBuilder.durable(NORMAL_DLX_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_Exchange)
                .withArgument("x-dead-letter-routing-key", "dlx.wq")
                .withArgument("x-message-ttl", NORMAL_DLX_EXPIRATION)
                .withArgument("x-max-length",NORMAL_DLX_LENGTH)
                .build();
    }

    //聲明正常隊列和正常交換機的綁定
    @Bean
    public Binding normalDlxBinding(){
        return BindingBuilder.bind(normalDlxQueue()).to(normalDlxExchange()).with("test.dlx.#");
    }

//=========================================================================

    //聲明死信隊列
    @Bean
    public Queue dlxQueue(){
        return new Queue(DLX_QUEUE);
    }
    //聲明死信交換機
    @Bean
    public TopicExchange dlxExchange(){
        return new TopicExchange(DLX_Exchange);
    }
    //聲明死信隊列和死信交換機的綁定
    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.#");
    }


}
DlxController類
package com.qingfeng.rabbitmqhighproducer.dlx;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("dlx")
public class DlxController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //http://127.0.0.1:8081/dlx/testTimeDLX
    //測試時間過期
    @GetMapping("/testTimeDLX")
    public String testTimeDLX() {
        String messageId = String.valueOf(UUID.randomUUID());
        //normal_dlx_exchange正常交換機  test.dlx.wq:正常交換機與正常綁定的隊列的路由
        rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
        return "ok";
    }


}

啟動rabbitmq-high-producer項目

1.測試原隊列存在消息過期設置,消息到達超時時間未被消費

http://127.0.0.1:8081/dlx/testTimeDLX

我們在設置的ttl過期時間10000毫秒過后,也就是10秒后,正常隊列的消息會轉到死信隊列里面去

2.測試隊列消息長度到達限制

DlxController類
package com.qingfeng.rabbitmqhighproducer.dlx;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("dlx")
public class DlxController {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    //http://127.0.0.1:8081/dlx/veroLengthDLX
    //2.測試隊列超出長度
    @GetMapping("/veroLengthDLX")
    public String veroLengthDLX() {
        for (int i=0;i<20;i++){
            String messageId = String.valueOf(UUID.randomUUID());
            rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
        }
        return "ok";
    }



}

啟動rabbitmq-high-producer項目

訪問:http://127.0.0.1:8081/dlx/veroLengthDLX
設置正常隊列長度限制為10,我們生產者發送了20個消息,正常隊列只能保存10個

我們在設置的ttl過期時間10000毫秒過后,也就是10秒后,正常隊列的消息會全部轉到死信隊列里面去

3消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;

在rabbitmq-high-producer項目的DlxController類添加

package com.qingfeng.rabbitmqhighproducer.dlx;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("dlx")
public class DlxController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //3.測試消息被消費者拒收

//http://127.0.0.1:8081/dlx/rejectionDLX
    @GetMapping("/rejectionDLX")
    public String rejectionDLX() {
        String messageId = String.valueOf(UUID.randomUUID());
        rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
        return "ok";
    }

}

在rabbitmq-high-consumer項目
DxlListener類  開啟int i = 1/0;//出現錯誤
package com.qingfeng.rabbitmqhighconsumer.dxl;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Consumer ACK機制:
 *  1. 設置手動簽收。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
 *  2. 讓監聽器類實現ChannelAwareMessageListener接口
 *  3. 如果消息成功處理,則調用channel的 basicAck()簽收
 *  4. 如果消息處理失敗,則調用channel的basicNack()拒絕簽收,broker重新發送給consumer
 */

@Component
public class DxlListener {

    //手動簽收
    @RabbitHandler
    @RabbitListener(queues = "normal_dlx_queue")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收轉換消息
            System.out.println("接受到的消息為"+new String(message.getBody()));

            //2. 處理業務邏輯
            System.out.println("處理業務邏輯...");
            int i = 1/0;//出現錯誤
            //3. 手動簽收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            /**
             * 4.有異常就拒絕簽收
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * 第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue,broker會重新發送該消息給消費
             * requeue:true為將消息重返當前消息隊列,還可以重新發送給消費者;
             * alse:將消息丟棄
             */
            System.out.println("有異常就拒絕簽收");
            //拒絕簽收,不重回隊列,requeue為false,這樣才能到死信隊列里面去
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

啟動rabbitmq-high-producer和rabbitmq-high-consumer項目


測試:http://127.0.0.1:8081/dlx/rejectionDLX

在rabbitmq-high-consumer項目consumer拒絕接收消息,直接轉到死信隊列去了

小結:

  1. 死信交換機和死信隊列和普通的沒有區別

  2. 當消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列

  3. 消息成為死信的三種情況:
    1. 隊列消息長度到達限制;
    2. 消費者拒接消費消息,并且不重回隊列;
    3. 原隊列存在消息過期設置,消息到達超時時間未被消費;




總結

以上是生活随笔為你收集整理的rabbitmq的死信队列(四)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。