rabbitmq-发布订阅模式
【README】
本文po出 mq的發(fā)布訂閱模式,及代碼示例;
?
【1】intro
1) 角色: 有4個角色, 包括 生產(chǎn)者,消費者, 交換機 exchange(X), 隊列;
2)交換機: 一方面,接收生產(chǎn)者的消息,另一方面,處理消息,如發(fā)送給隊列,或丟棄;這取決于 exchange類型;
3)exchange類型有如下3種:
fanout 廣播, 把消費轉(zhuǎn)發(fā)給所有 綁定到該交換機的所有隊列;
direct 定向, 把消息轉(zhuǎn)發(fā)給符合 指定 routing key 路由鍵的隊列;
topic 通配符, 把消息交給 routing pattern(路由模式)的隊列;
4)exchange 交換機, 只負責(zé)轉(zhuǎn)發(fā)消息, 不具備存儲消息的能力; 因此如果沒有任何隊列與 exchange 綁定, 或者沒有符合規(guī)則的隊列, 那么消息會丟失;
5)發(fā)布訂閱模式:
5.1-每個消費者監(jiān)聽自己的隊列;
5.2-生產(chǎn)者把消息發(fā)送給 broker, 由交換機把消息轉(zhuǎn)發(fā)到綁定此交換機的所有隊列;
6)交換機需要與隊列綁定, 綁定之后,一個消息可以被多個消費者收到;
【2】代碼(生產(chǎn)者1個,交換機exchange1個,但對應(yīng)到2個隊列,即消息有2個replication)
生產(chǎn)者
/*** 發(fā)布訂閱模式生產(chǎn)者* 本文發(fā)布訂閱模式使用的交換機類型為廣播 fanout * @author tang rong */ public class PSProduer {/** 交換機類型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 隊列名稱1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";/** 隊列名稱1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 創(chuàng)建連接Channel channel = conn.createChannel(); // 創(chuàng)建頻道/*** 聲明交換機* 參數(shù)1-交換機名稱 * 參數(shù)2-交換機類型(fanout, topic, direct, headers)*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);/*** 創(chuàng)建隊列* @param1 隊列名稱* @param2 是否持久化隊列* @param3 是否獨占本次連接 * @param4 是否在不使用的時候自動刪除隊列 * @param5 隊列其他參數(shù) */ channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 隊列綁定交換機 */channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");/*** 發(fā)送消息 */long temp = 1; for (int i = 0; i < 1000; i++) { String msg = "發(fā)布訂閱模式消息,序號=" + (temp+i) + "時間=" + MyDateUtil.getNow();/*** 參數(shù)1 交換機名稱,沒有指定則使用默認(rèn)交換機 Default change * 參數(shù)2 路由key,簡單模式可以傳遞隊列名稱 * 參數(shù)3 消息其他屬性 * 參數(shù)4 消息內(nèi)容 */channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes("UTF-8")); System.out.println("生產(chǎn)者發(fā)送消息" + msg); } System.out.println("=== 生產(chǎn)者消息發(fā)送完成");/* 關(guān)閉資源 */channel.close();conn.close(); } }消費者1
/*** 發(fā)布訂閱模式消費者1* @author tang rong */ public class PSConsumer1 {/** 交換機類型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 隊列名稱1 */static String FANOUT_QUEUE_1 = "fanout_queue_1";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 創(chuàng)建連接 Channel channel = conn.createChannel(); // 創(chuàng)建隊列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 創(chuàng)建交換機/*** 創(chuàng)建隊列 * 參數(shù)1 隊列名稱 * 參數(shù)2 是否持久化* 參數(shù)3 是否獨占本連接 * 參數(shù)4 是否在不使用的時候自動刪除隊列* 參數(shù)5 隊列其他參數(shù) */channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);/*** 隊列綁定交換機*/channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");/* 創(chuàng)建消費者,設(shè)置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費者標(biāo)簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內(nèi)容,包括消息id,消息routingkey,交換機,消息和重轉(zhuǎn)標(biāo)記(收到消息失敗后是否需要重新發(fā)送) * @param properties 基本屬性* @param body 消息字節(jié)數(shù)組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費者收到的消息【%s】", message)); System.out.println("=== 消費者1 end ===\n"); } };/*** 監(jiān)聽消息* 參數(shù)1 隊列名稱 * 參數(shù)2 是否自動確認(rèn), 設(shè)置為true表示消息接收到自動向 mq回復(fù)ack;mq收到ack后會刪除消息; 設(shè)置為false則需要手動發(fā)送ack; * 參數(shù)3 消息接收后的回調(diào) */channel.basicConsume(FANOUT_QUEUE_1, true, consumer); }}消費者2
/*** 發(fā)布訂閱模式消費者* @author tang rong */ public class PSConsumer2 {/** 交換機類型 */static String FANOUT_EXCHANGE = "fanout_exchange";/** 隊列名稱1 */static String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection conn = RBConnectionUtil.getConn(); // 創(chuàng)建連接 Channel channel = conn.createChannel(); // 創(chuàng)建隊列 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 創(chuàng)建交換機/*** 創(chuàng)建隊列 * 參數(shù)1 隊列名稱 * 參數(shù)2 是否持久化* 參數(shù)3 是否獨占本連接 * 參數(shù)4 是否在不使用的時候自動刪除隊列* 參數(shù)5 隊列其他參數(shù) */channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);/*** 隊列綁定交換機*/channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");/* 創(chuàng)建消費者,設(shè)置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費者標(biāo)簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內(nèi)容,包括消息id,消息routingkey,交換機,消息和重轉(zhuǎn)標(biāo)記(收到消息失敗后是否需要重新發(fā)送) * @param properties 基本屬性* @param body 消息字節(jié)數(shù)組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費者2 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費者收到的消息【%s】", message)); System.out.println("=== 消費者2 end ===\n"); } };/*** 監(jiān)聽消息* 參數(shù)1 隊列名稱 * 參數(shù)2 是否自動確認(rèn), 設(shè)置為true表示消息接收到自動向 mq回復(fù)ack;mq收到ack后會刪除消息; 設(shè)置為false則需要手動發(fā)送ack; * 參數(shù)3 消息接收后的回調(diào) */channel.basicConsume(FANOUT_QUEUE_2, true, consumer); }}?
【3】小結(jié)
1)發(fā)布訂閱模式與工作模式的區(qū)別;
區(qū)別1)工作隊列模式不需要定義交換機, 發(fā)布訂閱模式需要;
區(qū)別2)工作隊列模式的生產(chǎn)者向隊列發(fā)送消息(底層使用默認(rèn)交換機),? 發(fā)布訂閱模式的生產(chǎn)者向交換機發(fā)送消息;
區(qū)別3)工作隊列模式的隊列不需要與交換機綁定(底層與默認(rèn)交換機綁定), 發(fā)布訂閱模式中的隊列需要與交換機綁定;
2)默認(rèn)交換機
AMQP default
?
?
?
?
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的rabbitmq-发布订阅模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 下载的网站模板怎么使用(下载的网站模板怎
- 下一篇: rabbitmq-路由模式-routin