RabbitMQ 一二事(2) - 工作队列使用
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ 一二事(2) - 工作队列使用
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
上篇文章講了簡單隊列的使用,這其實就是RMQ給的demo,實際并沒有什么用
本篇講講工作模式隊列,也稱之為任務(wù)隊列
一個生產(chǎn)者發(fā)布了多條消息,消費者A可以接受消息,接受消息后該消息就消除,消費者B可以接受其他消息
使用場景,一些數(shù)據(jù)庫操作比較緩慢的話可以分別給多個接口調(diào)用,降低壓力,或者搶單場景也能考慮,
比如就10個商品,100個消費者來搶單,前10個搶到了后,消息隊列就為空了,那么第11個以后的所有消費者都不會搶到
代碼示例:
生產(chǎn)者
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到連接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 聲明隊列 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 13 for (int i = 0; i < 50; i++) { 14 // 消息內(nèi)容 15 String message = "" + i; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 17 System.out.println(" [x] Sent '" + message + "'"); 18 19 Thread.sleep(i * 10); 20 } 21 22 channel.close(); 23 connection.close(); 24 } 25 }?
消費者1
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到連接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一時刻服務(wù)器只會發(fā)一條消息給消費者, 如果注釋了就是指生產(chǎn)者平均分配任務(wù)給消費者 15 channel.basicQos(1); 16 17 // 定義隊列的消費者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 監(jiān)聽隊列,手動返回完成 設(shè)置fasle代表需要手動返回消息的確認狀態(tài) 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 獲取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 // 休眠 28 Thread.sleep(10); 29 // 手動確認 返回確認狀態(tài) 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }?
消費者2
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到連接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一時刻服務(wù)器只會發(fā)一條消息給消費者, 如果注釋了就是指生產(chǎn)者平均分配任務(wù)給消費者 15 channel.basicQos(1); 16 17 // 定義隊列的消費者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 監(jiān)聽隊列,手動返回完成狀態(tài) 設(shè)置fasle代表需要手動返回消息的確認狀態(tài) 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 獲取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 // 休眠1秒 28 Thread.sleep(1000); 29 // 手動確認 返回確認狀態(tài) 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }?
轉(zhuǎn)載于:https://www.cnblogs.com/leechenxiang/p/5516726.html
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ 一二事(2) - 工作队列使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ 一二事 - 简单队列使
- 下一篇: Interface继承至System.O