rabbitmq-死信队列
【README】死信隊(duì)列是什么?
1)死信隊(duì)列:當(dāng)消息在一個(gè)隊(duì)列中變成死信之后,它能被重新發(fā)送到另一個(gè)交換機(jī)中,這個(gè)交換機(jī)被稱(chēng)為DLX(Dead-Letter-Exchange-DLX , 死信交換機(jī));綁定DLX的隊(duì)列被稱(chēng)之為死信隊(duì)列;
2)消息變成死信的原因有三個(gè);
消息被拒絕;
消息過(guò)期;
隊(duì)列達(dá)到最大長(zhǎng)度;
?
【1】如何配置死信隊(duì)列
1)首先死信隊(duì)列是相對(duì)于正常業(yè)務(wù)隊(duì)列而言的;
如?? 白條還款隊(duì)列為業(yè)務(wù)隊(duì)列, 當(dāng)該隊(duì)列有消息過(guò)期或被消費(fèi)者拒絕或隊(duì)列達(dá)到最大長(zhǎng)度,則這些消息會(huì)被轉(zhuǎn)發(fā)到私信隊(duì)列; 又死信隊(duì)列通常情況下通過(guò) 路由鍵進(jìn)行轉(zhuǎn)發(fā),所以需要在死信隊(duì)列前加一個(gè)交換機(jī)(俗稱(chēng)死信交換機(jī));
2)要想使用死信隊(duì)列,必須聲明死信交換機(jī),可以在聲明正常業(yè)務(wù)隊(duì)列的時(shí)候設(shè)置? 正常業(yè)務(wù)隊(duì)列對(duì)應(yīng)的死信交換機(jī)(需要注意這個(gè)映射關(guān)系),如下:
/* 2-聲明正常隊(duì)列時(shí),配置其對(duì)應(yīng)死信隊(duì)列 */ Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", DLX_EXCHANGE); //這個(gè)agruments屬性,要設(shè)置到聲明隊(duì)列上 channel.queueDeclare(NORM_QUEUE, true, false, false, agruments);【2】死信隊(duì)列荔枝
【2.1】 聲明隊(duì)列,交換機(jī)
step1)聲明正常業(yè)務(wù)隊(duì)列,正常業(yè)務(wù)交換機(jī),定義正常業(yè)務(wù)路由鍵,以及三者間的綁定; 同時(shí),在聲明正常業(yè)務(wù)隊(duì)列時(shí),一定要設(shè)置其 死信交換機(jī)(DLX_EXCHANGE),作為屬性傳入聲明方法;
step2)聲明死信隊(duì)列,死信交換機(jī)(DLX_EXCHANGE), 定義死信隊(duì)列接收的路由鍵(#表示所有),以及三者間的綁定;
/*** 聲明正常隊(duì)列+交換機(jī);聲明死信隊(duì)列+交換機(jī) */ public class DlxDeclare {public static final String POSTFIX = "_tr1";/*** 正常隊(duì)列*/public static final String NORM_QUEUE = "norm_queue" + POSTFIX;/*** 正常交換機(jī) */public static final String NORM_EXCHANGE = "norm_exchange" + POSTFIX;/*** 正常路由鍵*/public static final String NORM_ROUTE = "dlx.save" + POSTFIX;/*** 死信隊(duì)列*/public static final String DLX_QUEUE = "dlx_queue" + POSTFIX; /*** 死信交換機(jī)*/public static final String DLX_EXCHANGE = "dlx_exchange" + POSTFIX;/*** 死信隊(duì)列路由鍵-#-所有鍵 */public static final String DLX_ROUTE = "#"; public static void main(String[] args) throws Exception {/* 獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建信道 Channel channel = conn.createChannel();/*1-聲明正常交換機(jī)*/channel.exchangeDeclare(NORM_EXCHANGE, BuiltinExchangeType.DIRECT);/* 2-聲明正常隊(duì)列時(shí),配置其對(duì)應(yīng)死信隊(duì)列 */ Map<String, Object> agruments = new HashMap<String, Object>();agruments.put("x-dead-letter-exchange", DLX_EXCHANGE); //這個(gè)agruments屬性,要設(shè)置到聲明隊(duì)列上channel.queueDeclare(NORM_QUEUE, true, false, false, agruments);/* 3-綁定正常交換機(jī),正常隊(duì)列和路由鍵 */ channel.queueBind(NORM_QUEUE, NORM_EXCHANGE, NORM_ROUTE);/*4-聲明死信交換機(jī),死信隊(duì)列,并綁定兩者*/ channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);channel.queueDeclare(DLX_QUEUE, true, false, false, null);channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTE);} }【2.2】生產(chǎn)者
1)我們把消息的 ttl 設(shè)置為10秒,即該消息10秒內(nèi)沒(méi)有被成功消費(fèi),則消息轉(zhuǎn)發(fā)到死信隊(duì)列;
/*** 死信消息生產(chǎn)者 */ public class DlxProducer {public static void main(String[] args) throws Exception {/* 獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建信道 Channel channel = conn.createChannel();AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000") // 設(shè)置消息的ttl .build();/* 發(fā)送消息 */ long temp = System.currentTimeMillis();for (int i = 1; i <= 20; i++) { String msg = "我是 msg_norm消息,序號(hào)=" + (temp+i) + "時(shí)間=" + MyDateUtil.getNow(); channel.basicPublish(DlxDeclare.NORM_EXCHANGE, DlxDeclare.NORM_ROUTE, properties, msg.getBytes("UTF-8")); System.out.println("生產(chǎn)者發(fā)送消息" + msg); Thread.sleep(10);} /* 關(guān)閉連接和信道 */ channel.close();conn.close(); } }【3】效果
1)生產(chǎn)者把消息發(fā)送到正常交換機(jī) norm_exchange_tr1
2)10秒后,由于沒(méi)有消費(fèi),消息被轉(zhuǎn)發(fā)到死信隊(duì)列 dlx_queue_tr1
?
總結(jié)
以上是生活随笔為你收集整理的rabbitmq-死信队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: (思科anti ddos)
- 下一篇: rabbitmq手动确认ack