阻塞队列和ArrayBlockingQueue源码解析
什么是阻塞隊(duì)列
當(dāng)隊(duì)列中為空時(shí),從隊(duì)列總獲取元素的操作將被阻塞,當(dāng)隊(duì)列滿時(shí),向隊(duì)列中添加元素的操作將被阻塞。試圖從空的阻塞隊(duì)列中獲取元素的線程將會(huì)被阻塞,知道其它的線程往隊(duì)列中插入新的元素。同樣,試圖往滿的隊(duì)列中添加新元素的線程也會(huì)被阻塞,直到有其他的線程使隊(duì)列重新變的空閑起來。
| 插入方法 | add(e) | offer(e) | put() | offer(e, time, unit) |
| 移除方法 | remove() | poll(e) | take() | poll(time, unit) |
| 檢查方法 | element() | peek() | 無 | 無 |
- 拋出異常:當(dāng)隊(duì)列滿時(shí),再向隊(duì)列中插入元素,則會(huì)拋出IllegalStateException異常。當(dāng)隊(duì)列空時(shí),再向隊(duì)列中獲取元素,則會(huì)拋出NoSuchElementException異常。
- 返回特殊值:當(dāng)隊(duì)列滿時(shí),向隊(duì)列中添加元素,則返回false,否則返回true。當(dāng)隊(duì)列為空時(shí),向隊(duì)列中獲取元素,則返回null,否則返回元素。
- 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者向隊(duì)列中插入元素,則隊(duì)列會(huì)一直阻塞當(dāng)前線程,直到隊(duì)列可用或響應(yīng)中斷退出。當(dāng)阻塞隊(duì)列為空時(shí),如果消費(fèi)者線程向阻塞隊(duì)列中獲取數(shù)據(jù),則隊(duì)列會(huì)一直阻塞當(dāng)前線程,直到隊(duì)列空閑或響應(yīng)中斷退出。
- 超時(shí)退出:當(dāng)隊(duì)列滿時(shí),如果生產(chǎn)線程向隊(duì)列中添加元素,則隊(duì)列會(huì)阻塞生產(chǎn)線程一段時(shí)間,超過指定的時(shí)間則退出返回false。當(dāng)隊(duì)列為空時(shí),消費(fèi)線程從隊(duì)列中移除元素,則隊(duì)列會(huì)阻塞一段時(shí)間,如果超過指定時(shí)間退出返回null。
java里的阻塞隊(duì)列
LinkedTransferQueue: 一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列,相當(dāng)于其它隊(duì)列,LinkedTransferQueue隊(duì)列多了transfer和tryTransfer方法。
- transfer:如果當(dāng)前有消費(fèi)線程正在獲取元素,transfer則把元素直接傳給消費(fèi)線程,否則加入到隊(duì)列中,知道該元素被消費(fèi)才返回。
- tryTransfer:如果當(dāng)前有消費(fèi)這正在獲取元素,tryTransfer則把元素直接傳給消費(fèi)線程,否則立即返回false;
LinkedBlockingQueue: 一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。隊(duì)列頭部和尾部都可以添加和移除元素,多線程并發(fā)時(shí),可以將鎖的競(jìng)爭(zhēng)最多降到一半。
ArrayBlockingQueue 的源碼解析
ArrayBlockingQueue類的結(jié)構(gòu)如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -817911632652898426L;final Object[] items; //用數(shù)據(jù)來存儲(chǔ)元素的容器int takeIndex; //下一次讀取或移除的位置(remove、poll、take )int putIndex; //下一次存放元素的位置(add、offer、put)int count; //隊(duì)列中元素的總數(shù)final ReentrantLock lock; //所有訪問的保護(hù)鎖private final Condition notEmpty; //等待獲取元素的條件private final Condition notFull; //等待存放元素的條件略...可以看出ArrayBlockingQueue內(nèi)部使用final修飾的對(duì)象數(shù)組來存儲(chǔ)元素,一旦初始化數(shù)組,數(shù)組的大小就不可改變。使用ReentrantLock鎖來保證鎖競(jìng)爭(zhēng),使用Condition來控制插入或獲取元素時(shí),線程是否阻塞。
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;//獲得支持響應(yīng)中斷的鎖lock.lockInterruptibly();try {//使用while循環(huán)來判斷隊(duì)列是否已滿,防止假喚醒while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}首先獲得鎖,然后判斷隊(duì)列是否已滿,如果已滿則阻塞當(dāng)前生成線程,直到隊(duì)列中空閑時(shí),被喚醒操作。隊(duì)列空閑則調(diào)用enqueue 插入元素。
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;//把當(dāng)前元素插入到數(shù)組中去items[putIndex] = x;//這里可以看出這個(gè)數(shù)組是個(gè)環(huán)形數(shù)組if (++putIndex == items.length)putIndex = 0;count++;// 喚醒在notEmpty條件上等待的線程 notEmpty.signal();}把元素插入到隊(duì)列中去,可以看出這個(gè)隊(duì)列中的數(shù)組是環(huán)形數(shù)組結(jié)構(gòu),這樣每次插入、移除的時(shí)候不需要復(fù)制移動(dòng)數(shù)組中的元素。
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//獲得可響應(yīng)中斷鎖lock.lockInterruptibly();try {//使用while循環(huán)來判斷隊(duì)列是否已滿,防止假喚醒while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}消費(fèi)者線程從阻塞隊(duì)列中獲取元素,如果隊(duì)列中元素為空,則阻塞當(dāng)前的消費(fèi)者線程直到有數(shù)據(jù)時(shí)才調(diào)用dequeue方法獲取元素。否則直接調(diào)用dequeue方法獲取元素
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")//獲取元素E x = (E) items[takeIndex];//將當(dāng)前位置的元素設(shè)置為nullitems[takeIndex] = null;//這里可以看出這個(gè)數(shù)組是個(gè)環(huán)形數(shù)組if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)//修改迭代器參數(shù)itrs.elementDequeued();// 喚醒在notFull條件上等待的線程 notFull.signal();return x;}直接從數(shù)據(jù)中獲取items[takeIndex]的元素,并設(shè)置當(dāng)前位置的元素為null,并設(shè)置下一次takeIndex的坐標(biāo)(++takeIndex),隊(duì)列元素總數(shù)-1等操作。
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;//獲得不可響應(yīng)中斷的鎖lock.lock();try {if (count == items.length)return false;else {//enqueue(e);return true;}} finally {lock.unlock();}}首先判斷隊(duì)列中的元素是否已滿,如果已滿則直接返回false,否則調(diào)用enqueue方法向隊(duì)列中插入元素,插入成功返回true。
public E poll() {final ReentrantLock lock = this.lock;//獲得不可響應(yīng)中斷的鎖lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}判斷隊(duì)列是否為空,如果為空返回null,否則調(diào)用dequeue方法返回元素。
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}首先調(diào)用offer方法插入元素,插入成功返回true,否則拋出IllegalStateException異常。
public E remove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();}首先調(diào)用poll方法獲取元素,如果不為空則直接返回,否則拋出NoSuchElementException異常。
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);//得到超時(shí)的時(shí)間long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//獲得可響應(yīng)中斷的鎖lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}}首先判斷隊(duì)列是否已滿,如果已滿再循環(huán)判斷超時(shí)時(shí)間是否超時(shí),超時(shí)則直接返回false,否則阻塞該生產(chǎn)線程nanos時(shí)間,如果nanos時(shí)間之內(nèi)喚醒則調(diào)用enqueue方法插入元素。如果隊(duì)列不滿則直接調(diào)用enqueue方法插入元素,并返回true。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {//得到超時(shí)的時(shí)間long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//獲得可響應(yīng)中斷的鎖lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}首先循環(huán)判斷隊(duì)列是否為空,如果為空再判斷是否超時(shí),超時(shí)則返回null。不超時(shí)則等待,在nanos時(shí)間喚醒則調(diào)用dequeue方法獲取元素。
public E element() {E x = peek();if (x != null)return x;elsethrow new NoSuchElementException();}調(diào)用peek方法獲取元素,元素不為空則返回,否則拋出NoSuchElementException異常。
public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}final E itemAt(int i) {return (E) items[i];}調(diào)用itemAt方法獲取元素。
其它的阻塞隊(duì)列實(shí)現(xiàn)原理都類似,都是使用ReentrantLock和Condition來完成并發(fā)控制、阻塞的。
????本人簡(jiǎn)書blog地址:http://www.jianshu.com/u/1f0067e24ff8
????點(diǎn)擊這里快速進(jìn)入簡(jiǎn)書
總結(jié)
以上是生活随笔為你收集整理的阻塞队列和ArrayBlockingQueue源码解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HashMap 源码解析(JDK1.8)
- 下一篇: JAVA对象在JVM中内存分配