日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java.util.concurrent BlockingQueue详解

發(fā)布時間:2025/7/14 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java.util.concurrent BlockingQueue详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

什么是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强铡.旉犃袧M時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費者也只從容器里拿元素。

阻塞隊列提供了四種處理方法:

方法\處理方式拋出異常返回特殊值一直阻塞超時退出
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
檢查element()peek()不可用不可用

?

?

?

?

?

  • 拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException("Queue full")異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,如果生產(chǎn)者線程往隊列里put元素,隊列會一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù),或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產(chǎn)者線程一段時間,如果超過一定的時間,生產(chǎn)者線程就會退出

多線程環(huán)境中,通過隊列可以很容易實現(xiàn)數(shù)據(jù)共享,比如經(jīng)典的“生產(chǎn)者”和“消費者”模型中,通過隊列可以很便利地實現(xiàn)兩者之間的數(shù)據(jù)共享。假設我們有若干生產(chǎn)者線程,另外又有若干個消費者線程。如果生產(chǎn)者線程需要把準備好的數(shù)據(jù)共享給消費者線程,利用隊列的方式來傳遞數(shù)據(jù),就可以很方便地解決他們之間的數(shù)據(jù)共享問題。但如果生產(chǎn)者和消費者在某個時間段內(nèi),萬一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢?理想情況下,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費者消費的速度,并且當生產(chǎn)出來的數(shù)據(jù)累積到一定程度的時候,那么生產(chǎn)者必須暫停等待一下(阻塞生產(chǎn)者線程),以便等待消費者線程把累積的數(shù)據(jù)處理完畢,反之亦然。然而,在concurrent包發(fā)布以前,在多線程環(huán)境下,我們每個程序員都必須去自己控制這些細節(jié),尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領(lǐng)域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)

BlockingQueue成員詳細介紹

1. ArrayBlockingQueue

?

基于數(shù)組的阻塞隊列實現(xiàn),在ArrayBlockingQueue內(nèi)部,維護了一個定長數(shù)組,以便緩存隊列中的數(shù)據(jù)對象,這是一個常用的阻塞隊列,除了一個定長數(shù)組外,ArrayBlockingQueue內(nèi)部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數(shù)組中的位置。

ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費者獲取數(shù)據(jù),都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue;按照實現(xiàn)原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現(xiàn)生產(chǎn)者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數(shù)據(jù)寫入和獲取操作已經(jīng)足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對于GC的影響還是存在一定的區(qū)別。而在創(chuàng)建ArrayBlockingQueue時,我們還可以控制對象的內(nèi)部鎖是否采用公平鎖,默認采用非公平鎖

2. LinkedBlockingQueue

基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內(nèi)部也維持著一個數(shù)據(jù)緩沖隊列(該隊列由一個鏈表構(gòu)成),當生產(chǎn)者往隊列中放入一個數(shù)據(jù)時,隊列會從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊列內(nèi)部,而生產(chǎn)者立即返回;只有當隊列緩沖區(qū)達到最大值緩存容量時(LinkedBlockingQueue可以通過構(gòu)造函數(shù)指定該值),才會阻塞生產(chǎn)者隊列,直到消費者從隊列中消費掉一份數(shù)據(jù),生產(chǎn)者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),還因為其對于生產(chǎn)者端和消費者端分別采用了獨立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
作為開發(fā)者,我們需要注意的是,如果構(gòu)造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。

ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產(chǎn)者消費者問題,使用這兩個類足以。

示例:

生產(chǎn)者

?

package cn.com.example.concurrent.blockingQueue;import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;/*** Created by Jack on 2017/1/24.*/ public class Producer implements Runnable {private volatile boolean isRunning = true;private BlockingQueue queue;private static AtomicInteger count = new AtomicInteger();private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;public Producer(BlockingQueue queue) {this.queue = queue;}public void run() {String data = null;Random r = new Random();System.out.println("啟動生產(chǎn)者線程!");try {while (isRunning) {System.out.println("正在生產(chǎn)數(shù)據(jù)...");Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));data = "data:" + count.incrementAndGet();System.out.println("將數(shù)據(jù):" + data + "放入隊列...");if (!queue.offer(data, 2, TimeUnit.SECONDS)) {System.out.println("放入數(shù)據(jù)失敗:" + data);}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {System.out.println("退出生產(chǎn)者線程!");}}public void stop() {isRunning = false;} }

?

消費者

package cn.com.example.concurrent.blockingQueue;import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit;/*** Created by Jack on 2017/1/24.*/ public class Consumer implements Runnable {private BlockingQueue<String> queue;private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;public Consumer(BlockingQueue<String> queue) {this.queue = queue;}public void run() {System.out.println("啟動消費者線程!");Random r = new Random();boolean isRunning = true;try {while (isRunning) {System.out.println("正從隊列獲取數(shù)據(jù)...");String data = queue.poll(2, TimeUnit.SECONDS);if (null != data) {System.out.println("拿到數(shù)據(jù):" + data);System.out.println("正在消費數(shù)據(jù):" + data);Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));} else {// 超過2s還沒數(shù)據(jù),認為所有生產(chǎn)線程都已經(jīng)退出,自動退出消費線程。isRunning = false;}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {System.out.println("退出消費者線程!");}} }

測試

package cn.com.example.concurrent.blockingQueue;import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue;/*** Created by Jack on 2017/1/24.*/ public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {// 聲明一個容量為10的緩存隊列BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);Producer producer1 = new Producer(queue);Producer producer2 = new Producer(queue);Producer producer3 = new Producer(queue);Consumer consumer = new Consumer(queue);// 借助ExecutorsExecutorService service = Executors.newCachedThreadPool();// 啟動線程service.execute(producer1);service.execute(producer2);service.execute(producer3);service.execute(consumer);// 執(zhí)行10sThread.sleep(10 * 1000);producer1.stop();producer2.stop();producer3.stop();Thread.sleep(2000);// 退出Executorservice.shutdown();}}

結(jié)果

啟動消費者線程! 正從隊列獲取數(shù)據(jù)... 啟動生產(chǎn)者線程! 啟動生產(chǎn)者線程! 正在生產(chǎn)數(shù)據(jù)... 啟動生產(chǎn)者線程! 正在生產(chǎn)數(shù)據(jù)... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:1放入隊列... 正在生產(chǎn)數(shù)據(jù)... 拿到數(shù)據(jù):data:1 正在消費數(shù)據(jù):data:1 將數(shù)據(jù):data:2放入隊列... 正在生產(chǎn)數(shù)據(jù)... 正從隊列獲取數(shù)據(jù)... 拿到數(shù)據(jù):data:2 正在消費數(shù)據(jù):data:2 將數(shù)據(jù):data:3放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:4放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:5放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:6放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:7放入隊列... 正在生產(chǎn)數(shù)據(jù)... 正從隊列獲取數(shù)據(jù)... 拿到數(shù)據(jù):data:3 正在消費數(shù)據(jù):data:3 將數(shù)據(jù):data:8放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:9放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:10放入隊列... 正在生產(chǎn)數(shù)據(jù)... 正從隊列獲取數(shù)據(jù)... 拿到數(shù)據(jù):data:4 正在消費數(shù)據(jù):data:4 將數(shù)據(jù):data:11放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:12放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:13放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:14放入隊列... 正在生產(chǎn)數(shù)據(jù)... 將數(shù)據(jù):data:15放入隊列...

3. DelayQueue

DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數(shù)據(jù)的操作(生產(chǎn)者)永遠不會被阻塞,而只有獲取數(shù)據(jù)的操作(消費者)才會被阻塞。
使用場景:
  DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。

4. PriorityBlockingQueue

基于優(yōu)先級的阻塞隊列(優(yōu)先級的判斷通過構(gòu)造函數(shù)傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue并不會阻塞數(shù)據(jù)生產(chǎn)者,而只會在沒有可消費的數(shù)據(jù)時,阻塞數(shù)據(jù)的消費者。因此使用的時候要特別注意,生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對不能快于消費者消費數(shù)據(jù)的速度,否則時間一長,會最終耗盡所有的可用堆內(nèi)存空間。在實現(xiàn)PriorityBlockingQueue時,內(nèi)部控制線程同步的鎖采用的是公平鎖。

5. SynchronousQueue

一種無緩沖的等待隊列,類似于無中介的直接交易,有點像原始社會中的生產(chǎn)者和消費者,生產(chǎn)者拿著產(chǎn)品去集市銷售給產(chǎn)品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產(chǎn)者,如果一方?jīng)]有找到合適的目標,那么對不起,大家都在集市等待。相對于有緩沖的BlockingQueue來說,少了一個中間經(jīng)銷商的環(huán)節(jié)(緩沖區(qū)),如果有經(jīng)銷商,生產(chǎn)者直接把產(chǎn)品批發(fā)給經(jīng)銷商,而無需在意經(jīng)銷商最終會將這些產(chǎn)品賣給那些消費者,由于經(jīng)銷商可以庫存一部分商品,因此相對于直接交易模式,總體來說采用中間經(jīng)銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經(jīng)銷商的引入,使得產(chǎn)品從生產(chǎn)者到消費者中間增加了額外的交易環(huán)節(jié),單個產(chǎn)品的及時響應性能可能會降低。
  聲明一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區(qū)別:
  如果采用公平模式:SynchronousQueue會采用公平鎖,并配合一個FIFO隊列來阻塞多余的生產(chǎn)者和消費者,從而體系整體的公平策略;
  但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產(chǎn)者和消費者,而后一種模式,如果生產(chǎn)者和消費者的處理速度有差距,則很容易出現(xiàn)饑渴的情況,即可能有某些生產(chǎn)者或者是消費者的數(shù)據(jù)永遠都得不到處理。

BlockingQueue不光實現(xiàn)了一個完整隊列所具有的基本功能,同時在多線程環(huán)境下,他還自動管理了多線間的自動等待于喚醒功能,從而使得程序員可以忽略這些細節(jié),關(guān)注更高級的功能。?

?

轉(zhuǎn)載于:https://www.cnblogs.com/Zombie-Xian/p/6347386.html

總結(jié)

以上是生活随笔為你收集整理的java.util.concurrent BlockingQueue详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。