日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

阻塞队列和ArrayBlockingQueue源码解析

發(fā)布時(shí)間:2024/9/30 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 阻塞队列和ArrayBlockingQueue源码解析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

什么是阻塞隊(duì)列

當(dāng)隊(duì)列中為空時(shí),從隊(duì)列總獲取元素的操作將被阻塞,當(dāng)隊(duì)列滿時(shí),向隊(duì)列中添加元素的操作將被阻塞。試圖從空的阻塞隊(duì)列中獲取元素的線程將會(huì)被阻塞,知道其它的線程往隊(duì)列中插入新的元素。同樣,試圖往滿的隊(duì)列中添加新元素的線程也會(huì)被阻塞,直到有其他的線程使隊(duì)列重新變的空閑起來。

處理方式拋出異常返回特殊值一直阻塞超時(shí)退出
插入方法add(e)offer(e)put()offer(e, time, unit)
移除方法remove()poll(e)take()poll(time, unit)
檢查方法element()peek()
  • 拋出異常:當(dāng)隊(duì)列滿時(shí),再向隊(duì)列中插入元素,則會(huì)拋出IllegalStateException異常。當(dāng)隊(duì)列空時(shí),再向隊(duì)列中獲取元素,則會(huì)拋出NoSuchElementException異常。
  • 返回特殊值:當(dāng)隊(duì)列滿時(shí),向隊(duì)列中添加元素,則返回false,否則返回true。當(dāng)隊(duì)列為空時(shí),向隊(duì)列中獲取元素,則返回null,否則返回元素。
  • 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者向隊(duì)列中插入元素,則隊(duì)列會(huì)一直阻塞當(dāng)前線程,直到隊(duì)列可用或響應(yīng)中斷退出。當(dāng)阻塞隊(duì)列為空時(shí),如果消費(fèi)者線程向阻塞隊(duì)列中獲取數(shù)據(jù),則隊(duì)列會(huì)一直阻塞當(dāng)前線程,直到隊(duì)列空閑或響應(yīng)中斷退出。
  • 超時(shí)退出:當(dāng)隊(duì)列滿時(shí),如果生產(chǎn)線程向隊(duì)列中添加元素,則隊(duì)列會(huì)阻塞生產(chǎn)線程一段時(shí)間,超過指定的時(shí)間則退出返回false。當(dāng)隊(duì)列為空時(shí),消費(fèi)線程從隊(duì)列中移除元素,則隊(duì)列會(huì)阻塞一段時(shí)間,如果超過指定時(shí)間退出返回null。

java里的阻塞隊(duì)列

  • ArrayBlockingQueue: 一個(gè)由數(shù)組結(jié)構(gòu)組成的有界隊(duì)列。此隊(duì)列按照先進(jìn)先出的順序進(jìn)行排序。支持公平鎖和非公平鎖。
  • LinkedBlockingQueue:一個(gè)由鏈表結(jié)構(gòu)組成的有界隊(duì)列,此隊(duì)列的長(zhǎng)度為Integer.MAX_VALUE。此隊(duì)列按照先進(jìn)先出的順序進(jìn)行排序。
  • PriorityBlockingQueue: 一個(gè)支持線程優(yōu)先級(jí)排序的無界隊(duì)列,默認(rèn)自然序進(jìn)行排序,也可以自定義實(shí)現(xiàn)compareTo()方法來指定元素排序規(guī)則,不能保證同優(yōu)先級(jí)元素的順序。
  • DelayQueue: 一個(gè)實(shí)現(xiàn)PriorityBlockingQueue實(shí)現(xiàn)延遲獲取的無界隊(duì)列,在創(chuàng)建元素時(shí),可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有延時(shí)期滿后才能從隊(duì)列中獲取元素。
  • SynchronousQueue: 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每一個(gè)put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。
  • LinkedTransferQueue: 一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列,相當(dāng)于其它隊(duì)列,LinkedTransferQueue隊(duì)列多了transfer和tryTransfer方法。

    • transfer:如果當(dāng)前有消費(fèi)線程正在獲取元素,transfer則把元素直接傳給消費(fèi)線程,否則加入到隊(duì)列中,知道該元素被消費(fèi)才返回。
    • tryTransfer:如果當(dāng)前有消費(fèi)這正在獲取元素,tryTransfer則把元素直接傳給消費(fèi)線程,否則立即返回false;
  • LinkedBlockingQueue: 一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。隊(duì)列頭部和尾部都可以添加和移除元素,多線程并發(fā)時(shí),可以將鎖的競(jìng)爭(zhēng)最多降到一半。

  • ArrayBlockingQueue 的源碼解析

    ArrayBlockingQueue類的結(jié)構(gòu)如下:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -817911632652898426L;final Object[] items; //用數(shù)據(jù)來存儲(chǔ)元素的容器int takeIndex; //下一次讀取或移除的位置(remove、poll、take )int putIndex; //下一次存放元素的位置(add、offer、put)int count; //隊(duì)列中元素的總數(shù)final ReentrantLock lock; //所有訪問的保護(hù)鎖private final Condition notEmpty; //等待獲取元素的條件private final Condition notFull; //等待存放元素的條件略...

    可以看出ArrayBlockingQueue內(nèi)部使用final修飾的對(duì)象數(shù)組來存儲(chǔ)元素,一旦初始化數(shù)組,數(shù)組的大小就不可改變。使用ReentrantLock鎖來保證鎖競(jìng)爭(zhēng),使用Condition來控制插入或獲取元素時(shí),線程是否阻塞。

    public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;//獲得支持響應(yīng)中斷的鎖lock.lockInterruptibly();try {//使用while循環(huán)來判斷隊(duì)列是否已滿,防止假喚醒while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}

    首先獲得鎖,然后判斷隊(duì)列是否已滿,如果已滿則阻塞當(dāng)前生成線程,直到隊(duì)列中空閑時(shí),被喚醒操作。隊(duì)列空閑則調(diào)用enqueue 插入元素。

    private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;//把當(dāng)前元素插入到數(shù)組中去items[putIndex] = x;//這里可以看出這個(gè)數(shù)組是個(gè)環(huán)形數(shù)組if (++putIndex == items.length)putIndex = 0;count++;// 喚醒在notEmpty條件上等待的線程 notEmpty.signal();}

    把元素插入到隊(duì)列中去,可以看出這個(gè)隊(duì)列中的數(shù)組是環(huán)形數(shù)組結(jié)構(gòu),這樣每次插入、移除的時(shí)候不需要復(fù)制移動(dòng)數(shù)組中的元素。

    public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//獲得可響應(yīng)中斷鎖lock.lockInterruptibly();try {//使用while循環(huán)來判斷隊(duì)列是否已滿,防止假喚醒while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}

    消費(fèi)者線程從阻塞隊(duì)列中獲取元素,如果隊(duì)列中元素為空,則阻塞當(dāng)前的消費(fèi)者線程直到有數(shù)據(jù)時(shí)才調(diào)用dequeue方法獲取元素。否則直接調(diào)用dequeue方法獲取元素

    private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")//獲取元素E x = (E) items[takeIndex];//將當(dāng)前位置的元素設(shè)置為nullitems[takeIndex] = null;//這里可以看出這個(gè)數(shù)組是個(gè)環(huán)形數(shù)組if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)//修改迭代器參數(shù)itrs.elementDequeued();// 喚醒在notFull條件上等待的線程 notFull.signal();return x;}

    直接從數(shù)據(jù)中獲取items[takeIndex]的元素,并設(shè)置當(dāng)前位置的元素為null,并設(shè)置下一次takeIndex的坐標(biāo)(++takeIndex),隊(duì)列元素總數(shù)-1等操作。

    public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;//獲得不可響應(yīng)中斷的鎖lock.lock();try {if (count == items.length)return false;else {//enqueue(e);return true;}} finally {lock.unlock();}}

    首先判斷隊(duì)列中的元素是否已滿,如果已滿則直接返回false,否則調(diào)用enqueue方法向隊(duì)列中插入元素,插入成功返回true。

    public E poll() {final ReentrantLock lock = this.lock;//獲得不可響應(yīng)中斷的鎖lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}

    判斷隊(duì)列是否為空,如果為空返回null,否則調(diào)用dequeue方法返回元素。

    public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}

    首先調(diào)用offer方法插入元素,插入成功返回true,否則拋出IllegalStateException異常。

    public E remove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();}

    首先調(diào)用poll方法獲取元素,如果不為空則直接返回,否則拋出NoSuchElementException異常。

    public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);//得到超時(shí)的時(shí)間long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//獲得可響應(yīng)中斷的鎖lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}}

    首先判斷隊(duì)列是否已滿,如果已滿再循環(huán)判斷超時(shí)時(shí)間是否超時(shí),超時(shí)則直接返回false,否則阻塞該生產(chǎn)線程nanos時(shí)間,如果nanos時(shí)間之內(nèi)喚醒則調(diào)用enqueue方法插入元素。如果隊(duì)列不滿則直接調(diào)用enqueue方法插入元素,并返回true。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {//得到超時(shí)的時(shí)間long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//獲得可響應(yīng)中斷的鎖lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}

    首先循環(huán)判斷隊(duì)列是否為空,如果為空再判斷是否超時(shí),超時(shí)則返回null。不超時(shí)則等待,在nanos時(shí)間喚醒則調(diào)用dequeue方法獲取元素。

    public E element() {E x = peek();if (x != null)return x;elsethrow new NoSuchElementException();}

    調(diào)用peek方法獲取元素,元素不為空則返回,否則拋出NoSuchElementException異常。

    public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}final E itemAt(int i) {return (E) items[i];}

    調(diào)用itemAt方法獲取元素。

    其它的阻塞隊(duì)列實(shí)現(xiàn)原理都類似,都是使用ReentrantLock和Condition來完成并發(fā)控制、阻塞的。

    ????本人簡(jiǎn)書blog地址:http://www.jianshu.com/u/1f0067e24ff8
    ????點(diǎn)擊這里快速進(jìn)入簡(jiǎn)書

    與50位技術(shù)專家面對(duì)面20年技術(shù)見證,附贈(zèng)技術(shù)全景圖

    總結(jié)

    以上是生活随笔為你收集整理的阻塞队列和ArrayBlockingQueue源码解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。