日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

2 工作队列

發(fā)布時間:2023/12/20 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2 工作队列 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

  • 1、工作隊列的概念
  • 2、輪循分發(fā)(round-robin)
  • 3、公平分發(fā)(fair dispatch)
  • 4、消息應(yīng)答機(jī)制
    • 4.1 為什么需要消息應(yīng)答機(jī)制
    • 4.2 消息應(yīng)答機(jī)制的作用
  • 5、消息持久化

1、工作隊列的概念

簡單隊列不足 : 不支持多個消費(fèi)者;

工作隊列即一個生產(chǎn)者可以對應(yīng)多個消費(fèi)者同時消費(fèi);
相比簡單隊列支持多消費(fèi)者; 因為實際工作中,生產(chǎn)者服務(wù)一般都是很簡單的業(yè)務(wù)邏輯處理之后就發(fā)送到隊列,消費(fèi)者接收到隊列的消息之后,進(jìn)行復(fù)雜的業(yè)務(wù)邏輯處理,所以一般都是多個消費(fèi)者進(jìn)行處理.如是是一個消費(fèi)者進(jìn)行處理,那么隊列會積壓很多消息.

2、輪循分發(fā)(round-robin)

在默認(rèn)情況下, RabbitMQ將逐個發(fā)送消息到在序列中的下一個消費(fèi)者(而不考慮每個任務(wù)處理的時長等等,且是提前一次性分配,并非一個一個的分配) . 平均每個消費(fèi)者獲取相同數(shù)量的消息. 這種分發(fā)消息機(jī)制稱為 輪詢分發(fā)
當(dāng)消息進(jìn)入隊列 ,RabbitMQ就會分發(fā)消息 .它不看消費(fèi)者的應(yīng)答的數(shù)目 ,也不關(guān)心消費(fèi)者處理消息的能力,只是盲目的將第n條消息發(fā)給第n個消費(fèi)者

  • 生產(chǎn)者
/*** @author zhaod* @description* @date 2018/9/27 11:11*/ public class Producer {private static final Logger log = LoggerFactory.getLogger(Producer.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// 獲取連接Connection connection = MqConnectionUtil.getConnection();// 創(chuàng)建信道Channel channel = connection.createChannel();// 申明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 1; i < 11; i++) {String msg = "I am " + i + " old";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("P---->" + msg);Thread.sleep(500);}channel.close();connection.close();}}
  • 消費(fèi)者1
/*** @author zhaodi* @description 工作隊列* @date 2018/9/27 11:30*/ public class Consumer01 {private static final Logger log = LoggerFactory.getLogger(Consumer01.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException {// 連接Connection connection = MqConnectionUtil.getConnection();// 信道Channel channel = connection.createChannel();// 隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消費(fèi)者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = "C-->{1}接收:" +new String(body );try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg);}};// 監(jiān)聽channel.basicConsume(QUEUE_NAME,true,consumer);} }
  • 消費(fèi)者2
/*** @author zhaodi* @description 工作隊列* @date 2018/9/27 11:30*/ public class Consumer02 {private static final Logger log = LoggerFactory.getLogger(Consumer02.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException {// 連接Connection connection = MqConnectionUtil.getConnection();// 信道Channel channel = connection.createChannel();// 隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消費(fèi)者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = "C-->{2}接收:" +new String(body);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg);}};// 監(jiān)聽channel.basicConsume(QUEUE_NAME,true,consumer);} }

3、公平分發(fā)(fair dispatch)

根據(jù)消費(fèi)者處理性能,性能好的消費(fèi)的數(shù)據(jù)量多,性能差的消費(fèi)的數(shù)據(jù)量少 .這種分發(fā)消息機(jī)制稱為 公平分發(fā)

如何實現(xiàn)公平分發(fā)

  • 限制發(fā)給消費(fèi)者的消息只可以有1條,在這個消費(fèi)者確認(rèn)消息之前,不可以發(fā)送嚇一跳消息給這個消費(fèi)者

    int prefetch = 1; channel.basicQos(prefetch);
  • 默認(rèn)自動應(yīng)答改為手動應(yīng)答

    // 關(guān)閉自動應(yīng)答 Boolean autoAck = false; // 監(jiān)聽 channel.basicConsume(QUEUE_NAME, autoAck, consumer);// 開啟手動應(yīng)答 channel.basicAck(envelope.getDeliveryTag(), false);
  • 生產(chǎn)者

/*** @author zhaod* @description 公平分發(fā)* @date 2018/9/27 11:11*/ public class Producer03 {private static final Logger log = LoggerFactory.getLogger(Producer03.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// 獲取連接Connection connection = MqConnectionUtil.getConnection();// 創(chuàng)建信道Channel channel = connection.createChannel();// 申明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 每個消費(fèi)者在發(fā)送確認(rèn)消息之前,消息隊列不發(fā)送下一個消息給該消費(fèi)者,保證每次只處理一條消息int prefetch = 1;channel.basicQos(prefetch);for (int i = 1; i < 20; i++) {String msg = "I am " + i + " old";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("P---->" + msg);Thread.sleep(500);}channel.close();connection.close();} }
  • 消費(fèi)者1
/*** @author zhaodi* @description 工作隊列公平分發(fā)* @date 2018/9/27 11:30*/ public class Consumer03 {private static final Logger log = LoggerFactory.getLogger(Consumer03.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException {// 連接Connection connection = MqConnectionUtil.getConnection();// 信道Channel channel = connection.createChannel();// 隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//int prefetch = 1;channel.basicQos(prefetch);// 消費(fèi)者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = "C-->{3}接收:" +new String(body );try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg);// 處理完消息之后,發(fā)送回執(zhí),告訴隊列,你給我的消息我處理完了,你可以發(fā)送下一條消息給我了// envelope.getDeliveryTag(),標(biāo)出是哪條消息channel.basicAck(envelope.getDeliveryTag(),false);}};// 自動應(yīng)答關(guān)閉boolean autoAck = false;// 監(jiān)聽channel.basicConsume(QUEUE_NAME,autoAck,consumer);} }
  • 消費(fèi)者2
代碼同上面

4、消息應(yīng)答機(jī)制

4.1 為什么需要消息應(yīng)答機(jī)制

完成一個任務(wù)需要花費(fèi)幾秒鐘,但是如果某個消費(fèi)者開始執(zhí)行某個任務(wù)花費(fèi)了很長的時間并且在執(zhí)行到某個部分的時候崩潰了怎么辦。

在我們目前的代碼中,在向消費(fèi)者推送了某一條消息后,RabiitMQ會立即刪除這條消息的。

如果我們kill掉某個worker的話,那么我們將會丟失該worker正在處理的消息,我們也會丟失掉所有被發(fā)送到這個消費(fèi)者且未被處理完成的消息。

4.2 消息應(yīng)答機(jī)制的作用

為了保證消息永遠(yuǎn)不會被丟失,RabbitMQ采用消息應(yīng)答機(jī)制。

當(dāng)消費(fèi)者接收到消息并完成任務(wù)后會往RabbitMQ服務(wù)器發(fā)送一條確認(rèn)的命令,然后RabbitMQ才會將消息刪除。

如果某個消費(fèi)者在還有發(fā)送確認(rèn)信息就掛了,RabbitMQ將會視為服務(wù)沒有執(zhí)行完成,然后把執(zhí)行消息的服務(wù)再發(fā)給另外一個消費(fèi)者。這種方式下,即時某個worker掛了,也不會使得消息丟失。

這里不是用超時來判斷的,只有在某個消費(fèi)者連接斷開時,RabbitMQ才會把重新發(fā)送該消費(fèi)者沒有返回確認(rèn)的消息到其它消費(fèi)者那。即時處理某條任務(wù)花費(fèi)了很長的時間,在這里也是沒有問題的。

消息應(yīng)答機(jī)制默認(rèn)是開啟的,也就是說當(dāng)消費(fèi)者接收到消息的時候,不管是否開始處理接收到的消息,它已經(jīng)向RabbitMQ發(fā)送確認(rèn)消息,這時候RabbitMQ服務(wù)器就會刪除該條消息。

// 自動應(yīng)答關(guān)閉 boolean autoAck = false; // 監(jiān)聽 channel.basicConsume(QUEUE_NAME,autoAck,consumer);

很多人都會忘記調(diào)用basicAck方法,雖然這是一個很簡單的錯誤,但往往卻是致命。消費(fèi)者退出后消息將會被重發(fā),但是由于一些未能被確認(rèn)消息不能被釋放,RabbitMQ將會消耗掉越來越多的內(nèi)存

channel.basicAck(envelope.getDeliveryTag(),false);

5、消息持久化

如果RabbitMQ的服務(wù)器宕機(jī)那么怎么保證消息不丟失呢?

當(dāng)MQ重啟,那么之前的隊列消息是會丟失的;

解決:將隊列和消息都持久化存儲

注意點(diǎn):隊列持久化的時候,生產(chǎn)者和消費(fèi)者都要申明

// 申明隊列,并且指明該隊列是持久化的 Boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); ...// 發(fā)送消息,并且持久化消息 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

消息的持久化不能百分百的保證消息不會丟失,雖然RabbitMQ會把消息寫到磁盤上,但是從RabbitMQ接收到消息到寫到磁盤上,這個短時間的過程中發(fā)生的RabbitMQ重啟依然會使得為寫入到磁盤的消息被丟失;

事實上是這樣的,RabbitMQ接收到消息后,首先會把該消息寫到內(nèi)存緩沖區(qū)中,并不是直接把單條消息實時寫到磁盤上的。消息的持久化不是健壯的,但是對于簡單的任務(wù)隊列是夠用了。如果你需要一套很健壯的持久化方案,那么你可以使用publisher confirms

轉(zhuǎn)載于:https://www.cnblogs.com/zhaod/p/11389487.html

總結(jié)

以上是生活随笔為你收集整理的2 工作队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。