日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java并发包消息队列

發布時間:2024/9/27 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java并发包消息队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.


消息隊列常用于有生產者和消費者兩類角色的多線程同步場景

?

BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。

主要的方法是:put、take一對阻塞存取;add、poll一對非阻塞存取。

???????? 插入:

?????????????????? 1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則拋出異常

??????? 2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.

??????? 3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續.

???????? 讀取:

?????? ?4)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null

??????? 5)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止

???????? 其他

int remainingCapacity();返回隊列剩余的容量,在隊列插入和獲取的時候,不要瞎搞,數 據可能不準

boolean remove(Object o); 從隊列移除元素,如果存在,即移除一個或者更多,隊列改??? 變了返回true

public boolean contains(Object o); 查看隊列是否存在這個元素,存在返回true

int drainTo(Collection<? super E> c); 傳入的集合中的元素,如果在隊列中存在,那么將???? 隊列中的元素移動到集合中

int drainTo(Collection<? super E> c, int maxElements); 和上面方法的區別在于,制定了移?? 動的數量

案例:

package blockingqueue;

?

import java.util.concurrent.BlockingQueue;

?

public class Consumer implements Runnable {

??? BlockingQueue<String> queue;

???

??? public Consumer(BlockingQueue<String> queue) {

????? this.queue = queue;

?? }

???

?? @Override

?? public void run() {

????? try {

???????? String consumer = Thread.currentThread().getName();

???????? System.out.println(consumer);

???????? //如果隊列為空,會阻塞當前線程

???????? String temp = queue.take();

???????? System.out.println(consumer + "消費者? get a product:" + temp);

????? } catch (Exception e) {

???????? e.printStackTrace();

????? }

?? }

?

}

package blockingqueue;

?

import java.util.concurrent.BlockingQueue;

?

public class Producer implements Runnable {

?? BlockingQueue<String> queue;???

??? public Producer(BlockingQueue<String> queue) {?

??????? this.queue = queue;?

??? }???

??? @Override?

??? public void run() {?

??????? try {?

??????????? String temp = "A Product, 生產線程:"?

??????????????????? + Thread.currentThread().getName();?

??????????? queue.put(temp);//如果隊列是滿的話,會阻塞當前線程?

??????????? System.out.println("生產者 I have made a product: "?

??????????? ???? + Thread.currentThread().getName());

??????? } catch (InterruptedException e) {?

??????????? e.printStackTrace();?

??????? }?

??? }

}

package blockingqueue;

?

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

?

public class Test {

?

?? public static void main(String[] args) throws Exception {

????? BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);

????? // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

????? // 不設置的話,LinkedBlockingQueue默認大小為Integer.MAX_VALUE

????? // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

????? Consumer consumer = new Consumer(queue);

????? Producer producer = new Producer(queue);

????? for (int i = 0; i < 3; i++) {

???????? new Thread(producer, "Producer" + (i + 1)).start();

????? }

????? for (int i = 0; i < 5; i++) {

???????? new Thread(consumer, "Consumer" + (i + 1)).start();

????? }

?????

????? Thread.sleep(5000);

?????

????? new Thread(producer, "Producer" + (5)).start();

?? }

}

?

BlockingQueue有四個具體的實現類,常用的兩種實現類為:

?

1、ArrayBlockingQueue:一個由數組支持的有界阻塞隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。

?

2、LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。

???????? LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。

?

LinkedBlockingQueue和ArrayBlockingQueue區別:

?

LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背后所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大于ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低于ArrayBlockingQueue.

?

生產者消費者的示例代碼:

見代碼? TestBlockingQueue? TestBlockingQueueConsumer?? TestBlockingQueueProducer

package blockingqueue;

?

import java.util.Random;

import java.util.concurrent.BlockingQueue;

?

public class TestBlockingQueueProducer implements Runnable {

?? BlockingQueue<String> queue;

?? Random random = new Random();

?

?? public TestBlockingQueueProducer(BlockingQueue<String> queue) {

????? this.queue = queue;

?? }

?

?? @Override

?? public void run() {

?

????? for (int i = 0; i < 10; i++) {

???????? try {

??????????? Thread.sleep(random.nextInt(10));

??????????? String task = Thread.currentThread().getName() + " made a product " + i;

?

??????????? System.out.println(task);

??????????? queue.put(task);? //阻塞方法

???????? } catch (InterruptedException e) {

??????????? ?

??????????? e.printStackTrace();

???????? }

?

????? }

?? }

}

package blockingqueue;

?

import java.util.Random;

import java.util.concurrent.BlockingQueue;

?

public class TestBlockingQueueConsumer implements Runnable {

?? BlockingQueue<String> queue;

??? Random random = new Random();

???

??? public TestBlockingQueueConsumer(BlockingQueue<String> queue){?

??????? this.queue = queue;?

??? }???????

??? @Override?

??? public void run() {?

??????? try {?

??????? ? Thread.sleep(random.nextInt(10));

??????? ? System.out.println(Thread.currentThread().getName()+ "trying...");

??????????? String temp = queue.take();//如果隊列為空,會阻塞當前線程?

??????????? int remainingCapacity = queue.remainingCapacity();

??????????? System.out.println(Thread.currentThread().getName() + " get a job " +temp);

??????????? // System.out.println("隊列中的元素個數: "+ remainingCapacity);

??????? } catch (InterruptedException e) {?

??????????? e.printStackTrace();

??????? }?

??? }

}

package blockingqueue;

?

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

?

public class TestBlockingQueue {

?

?? public static void main(String[] args) {

????? BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);

????? // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

????? // 不設置的話,LinkedBlockingQueue默認大小為Integer.MAX_VALUE

????? // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

????? TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);

????? TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);

????? for (int i = 0; i < 3; i++) {

???????? new Thread(producer, "Producer" + (i + 1)).start();

????? }

????? for (int i = 0; i < 5; i++) {

???????? new Thread(consumer, "Consumer" + (i + 1)).start();

????? }

?? }

}

?

總結

以上是生活随笔為你收集整理的java并发包消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。