转:Java 7 种阻塞队列详解
轉自:
Java 7 種阻塞隊列詳解 - 云+社區 - 騰訊云隊列(Queue)是一種經常使用的集合。Queue 實際上是實現了一個先進先出(FIFO:First In First Out)的有序表。和 List、Set ...https://cloud.tencent.com/developer/article/1706970
隊列和阻塞隊列
隊列
隊列(Queue)是一種經常使用的集合。Queue 實際上是實現了一個先進先出(FIFO:First In First Out)的有序表。和 List、Set 一樣都繼承自 Collection。它和 List 的區別在于,List可以在任意位置添加和刪除元素,而Queue 只有兩個操作:
- 把元素添加到隊列末尾;
- 從隊列頭部取出元素。
超市的收銀臺就是一個隊列:
我們常用的 LinkedList 就可以當隊列使用,實現了 Dequeue 接口,還有 ConcurrentLinkedQueue,他們都屬于非阻塞隊列。
阻塞隊列
阻塞隊列,顧名思義,首先它是一個隊列,而一個阻塞隊列在數據結構中所起的作用大致如下
線程 1 往阻塞隊列中添加元素,而線程 2 從阻塞隊列中移除元素
- 當阻塞隊列是空時,從隊列中獲取元素的操作將會被阻塞。
- 當阻塞隊列是滿時,從隊列中添加元素的操作將會被阻塞。
試圖從空的阻塞隊列中獲取元素的線程將會阻塞,直到其他的線程往空的隊列插入新的元素,同樣,試圖往已滿的阻塞隊列添加新元素的線程同樣也會阻塞,直到其他的線程從列中移除一個或多個元素或者完全清空隊列后繼續新增。
類似我們去海底撈排隊,海底撈爆滿情況下,阻塞隊列相當于用餐區,用餐區滿了的話,就阻塞在候客區等著,可以用餐的話 put 一波去用餐,吃完就 take 出去。
為什么要用阻塞隊列,有什么好處嗎
在多線程領域:所謂阻塞,是指在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒。
那為什么需要 BlockingQueue 呢
好處是我們不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這些 BlockingQueue 都包辦了。
在 concurrent 包發布以前,多線程環境下,我們每個程序員都必須自己去實現這些細節,尤其還要兼顧效率和線程安全,這會給我們的程序帶來不小的復雜性。現在有了阻塞隊列,我們的操作就從手動擋換成了自動擋。
Java 里的阻塞隊列
Collection的子類除了我們熟悉的 List 和 Set,還有一個 Queue,阻塞隊列 BlockingQueue 繼承自 Queue。
BlockingQueue 是個接口,需要使用它的實現之一來使用 BlockingQueue,java.util.concurrent 包下具有以下 BlockingQueue 接口的實現類:
JDK 提供了 7 個阻塞隊列。分別是
- ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列
- LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列
- PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列
- SynchronousQueue:一個不存儲元素的阻塞隊列
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列(實現了繼承于 BlockingQueue 的 TransferQueue)
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列
BlockingQueue 核心方法
相比 Queue 接口,BlockingQueue 有四種形式的 API。
| 插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除(取出) | remove() | poll() | take() | poll(time,unit) |
| 檢查 | element() | peek() | 不可用 | 不可用 |
以 ArrayBlockingQueue 為例來看下 Java 阻塞隊列提供的常用方法
- 拋出異常:
- 當阻塞隊列滿時,再往隊列里 add 插入元素會拋出 java.lang.IllegalStateException: Queue full 異常;
- 當隊列為空時,從隊列里 remove 移除元素時會拋出 NoSuchElementException 異常 。
- element(),返回隊列頭部的元素,如果隊列為空,則拋出一個 NoSuchElementException 異常
- 返回特殊值:
- offer(),插入方法,成功返回 true,失敗返回 false;
- poll(),移除方法,成功返回出隊列的元素,隊列里沒有則返回 null
- peek() ,返回隊列頭部的元素,如果隊列為空,則返回 null
- 一直阻塞:
- 當阻塞隊列滿時,如果生產線程繼續往隊列里 put 元素,隊列會一直阻塞生產線程,直到拿到數據,或者響應中斷退出;
- 當阻塞隊列空時,消費線程試圖從隊列里 take 元素,隊列也會一直阻塞消費線程,直到隊列可用。
- 超時退出:
- 當阻塞隊列滿時,隊列會阻塞生產線程一定時間,如果超過一定的時間,生產線程就會退出,返回 false
- 當阻塞隊列空時,隊列會阻塞消費線程一定時間,如果超過一定的時間,消費線程會退出,返回 null
BlockingQueue 實現類
逐個分析下這 7 個阻塞隊列,常用的幾個順便探究下源碼。
ArrayBlockingQueue
ArrayBlockingQueue,一個由數組實現的有界阻塞隊列。該隊列采用先進先出(FIFO)的原則對元素進行排序添加的。
ArrayBlockingQueue 為有界且固定,其大小在構造時由構造函數來決定,確認之后就不能再改變了。
ArrayBlockingQueue 支持對等待的生產者線程和使用者線程進行排序的可選公平策略,但是在默認情況下不保證線程公平的訪問,在構造時可以選擇公平策略(fair = true)。公平性通常會降低吞吐量,但是減少了可變性和避免了“不平衡性”。(ArrayBlockingQueue 內部的阻塞隊列是通過 ReentrantLock 和 Condition 條件隊列實現的, 所以 ArrayBlockingQueue 中的元素存在公平和非公平訪問的區別)
所謂公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素,可以保證先進先出,避免饑餓現象。
源碼解讀
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 通過數組來實現的隊列final Object[] items;//記錄隊首元素的下標int takeIndex;//記錄隊尾元素的下標int putIndex;//隊列中的元素個數int count;//通過ReentrantLock來實現同步final ReentrantLock lock;//有2個條件對象,分別表示隊列不為空和隊列不滿的情況private final Condition notEmpty;private final Condition notFull;//迭代器transient Itrs itrs;//offer方法用于向隊列中添加數據public boolean offer(E e) {// 可以看出添加的數據不支持null值checkNotNull(e);final ReentrantLock lock = this.lock;//通過重入鎖來實現同步lock.lock();try {//如果隊列已經滿了的話直接就返回false,不會阻塞調用這個offer方法的線程if (count == items.length)return false;else {//如果隊列沒有滿,就調用enqueue方法將元素添加到隊列中enqueue(e);return true;}} finally {lock.unlock();}}//多了個等待時間的 offer方法public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//獲取可中斷鎖lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;//等待設置的時間nanos = notFull.awaitNanos(nanos);}//如果等待時間過了,隊列有空間的話就會調用enqueue方法將元素添加到隊列enqueue(e);return true;} finally {lock.unlock();}}//將數據添加到隊列中的具體方法private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;//通過循環數組實現的隊列,當數組滿了時下標就變成0了if (++putIndex == items.length)putIndex = 0;count++;//激活因為notEmpty條件而阻塞的線程,比如調用take方法的線程notEmpty.signal();}//將數據從隊列中取出的方法private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];//將對應的數組下標位置設置為null釋放資源items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//激活因為notFull條件而阻塞的線程,比如調用put方法的線程notFull.signal();return x;}//put方法和offer方法不一樣的地方在于,如果隊列是滿的話,它就會把調用put方法的線程阻塞,直到隊列里有空間public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;//因為后面調用了條件變量的await()方法,而await()方法會在中斷標志設置后拋出InterruptedException異常后退出,// 所以在加鎖時候先看中斷標志是不是被設置了,如果設置了直接拋出InterruptedException異常,就不用再去獲取鎖了lock.lockInterruptibly();try {while (count == items.length)//如果隊列滿的話就阻塞等待,直到notFull的signal方法被調用,也就是隊列里有空間了notFull.await();//隊列里有空間了執行添加操作enqueue(e);} finally {lock.unlock();}}//poll方法用于從隊列中取數據,不會阻塞當前線程public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {//如果隊列為空的話會直接返回null,否則調用dequeue方法取數據return (count == 0) ? null : dequeue();} finally {lock.unlock();}}//有等待時間的 poll 重載方法public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}//take方法也是用于取隊列中的數據,但是和poll方法不同的是它有可能會阻塞當前的線程public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//當隊列為空時,就會阻塞當前線程while (count == 0)notEmpty.await();//直到隊列中有數據了,調用dequeue方法將數據返回return dequeue();} finally {lock.unlock();}}//返回隊首元素public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}//獲取隊列的元素個數,加了鎖,所以結果是準確的public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}// 此外,還有一些其他方法//返回隊列剩余空間,還能加幾個元素public int remainingCapacity() {final ReentrantLock lock = this.lock;lock.lock();try {return items.length - count;} finally {lock.unlock();}}// 判斷隊列中是否存在當前元素opublic boolean contains(Object o){}// 返回一個按正確順序,包含隊列中所有元素的數組public Object[] toArray(){}// 自動清空隊列中的所有元素public void clear(){}// 移除隊列中所有可用元素,并將他們加入到給定的 Collection 中 public int drainTo(Collection<? super E> c){}// 返回此隊列中按正確順序進行迭代的,包含所有元素的迭代器public Iterator<E> iterator() }LinkedBlockingQueue
LinkedBlockingQueue 是一個用單向鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
如果不是特殊業務,LinkedBlockingQueue 使用時,切記要定義容量 new LinkedBlockingQueue(capacity)
,防止過度膨脹。
源碼解讀
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;// 基于鏈表實現,肯定要有結點類,典型的單鏈表結構static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}//容量private final int capacity;//當前隊列元素數量private final AtomicInteger count = new AtomicInteger();// 頭節點,不存數據transient Node<E> head;// 尾節點,便于入隊private transient Node<E> last;// take鎖,出隊鎖,只有take,poll方法會持有private final ReentrantLock takeLock = new ReentrantLock();// 出隊等待條件// 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒private final Condition notEmpty = takeLock.newCondition();// 入隊鎖,只有put,offer會持有private final ReentrantLock putLock = new ReentrantLock();// 入隊等待條件// 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒private final Condition notFull = putLock.newCondition();//同樣提供三個構造器public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();// 初始化head和last指針為空值節點this.capacity = capacity;last = head = new Node<E>(null);}public LinkedBlockingQueue() {// 如果沒傳容量,就使用最大int值初始化其容量this(Integer.MAX_VALUE);}public LinkedBlockingQueue(Collection<? extends E> c) {}//入隊public void put(E e) throws InterruptedException {// 不允許null元素if (e == null) throw new NullPointerException();//規定給當前put方法預留一個本地變量int c = -1;// 新建一個節點Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 使用put鎖加鎖putLock.lockInterruptibly();try {// 如果隊列滿了,就阻塞在notFull條件上// 等待被其它線程喚醒while (count.get() == capacity) {notFull.await();}// 隊列不滿了,就入隊enqueue(node);// 隊列長度加1c = count.getAndIncrement();// 如果現隊列長度小于容量// 就再喚醒一個阻塞在notFull條件上的線程// 這里為啥要喚醒一下呢?// 因為可能有很多線程阻塞在notFull這個條件上的// 而取元素時只有取之前隊列是滿的才會喚醒notFull// 為什么隊列滿的才喚醒notFull呢?// 因為喚醒是需要加putLock的,這是為了減少鎖的次數// 所以,這里索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程// 說白了,這也是鎖分離帶來的代價if (c + 1 < capacity)notFull.signal();} finally {// 釋放鎖putLock.unlock();}// 如果原隊列長度為0,現在加了一個元素后立即喚醒notEmpty條件if (c == 0)signalNotEmpty();}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;// 加take鎖takeLock.lock();try {// 喚醒notEmpty條件notEmpty.signal();} finally {takeLock.unlock();}}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}private void enqueue(Node<E> node) {// 直接加到last后面last = last.next = node;}public boolean offer(E e) {//用帶過期時間的說明}public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();//轉換為納秒long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//獲取入隊鎖,支持等待鎖的過程中被中斷putLock.lockInterruptibly();try {//隊列滿了,再看看有沒有超時while (count.get() == capacity) {if (nanos <= 0)//等待時間超時return false;//進行等待,awaitNanos(long nanos)是AQS中的方法//在等待過程中,如果被喚醒或超時,則繼續當前循環//如果被中斷,則拋出中斷異常nanos = notFull.awaitNanos(nanos);}//進入隊尾enqueue(new Node<E>(e));c = count.getAndIncrement();//說明當前元素后面還能再插入一個//就喚醒一個入隊條件隊列中阻塞的線程if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}//節點數量為0,說明隊列是空的if (c == 0)//喚醒一個出隊條件隊列阻塞的線程signalNotEmpty();return true;}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 如果隊列無元素,則阻塞在notEmpty條件上while (count.get() == 0) {notEmpty.await();}// 否則,出隊x = dequeue();// 獲取出隊前隊列的長度c = count.getAndDecrement();// 如果取之前隊列長度大于1,則喚醒notEmptyif (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 如果取之前隊列長度等于容量// 則喚醒notFullif (c == capacity)signalNotFull();return x;}private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {//隊列為空且已經超時,直接返回空if (nanos <= 0)return null;//等待過程中可能被喚醒,超時,中斷nanos = notEmpty.awaitNanos(nanos);}//進行出隊操作x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}//如果出隊前,隊列是滿的,則喚醒一個被take()阻塞的線程if (c == capacity)signalNotFull();return x;}public E poll() {//}public E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {Node<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}}void unlink(Node<E> p, Node<E> trail) {// assert isFullyLocked();// p.next is not changed, to allow iterators that are// traversing p to maintain their weak-consistency guarantee.p.item = null;trail.next = p.next;if (last == p)last = trail;if (count.getAndDecrement() == capacity)notFull.signal();}public boolean remove(Object o) {if (o == null) return false;fullyLock();try {for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}}public boolean contains(Object o) {}static final class LBQSpliterator<E> implements Spliterator<E> {} }LinkedBlockingQueue 與 ArrayBlockingQueue 對比
- ArrayBlockingQueue 入隊出隊采用一把鎖,導致入隊出隊相互阻塞,效率低下;
- LinkedBlockingQueue 入隊出隊采用兩把鎖,入隊出隊互不干擾,效率較高;
- 二者都是有界隊列,如果長度相等且出隊速度跟不上入隊速度,都會導致大量線程阻塞;
- LinkedBlockingQueue 如果初始化不傳入初始容量,則使用最大 int 值,如果出隊速度跟不上入隊速度,會導致隊列特別長,占用大量內存;
PriorityBlockingQueue
PriorityBlockingQueue 是一個支持優先級的無界阻塞隊列。(雖說是無界隊列,但是由于資源耗盡的話,也會OutOfMemoryError,無法添加元素)
默認情況下元素采用自然順序升序排列。也可以自定義類實現 compareTo() 方法來指定元素排序規則,或者初始化 PriorityBlockingQueue 時,指定構造參數 Comparator 來對元素進行排序。但需要注意的是不能保證同優先級元素的順序。PriorityBlockingQueue 是基于最小二叉堆實現,使用基于 CAS 實現的自旋鎖來控制隊列的動態擴容,保證了擴容操作不會阻塞 take 操作的執行。
DelayQueue
DelayQueue 是一個使用優先級隊列實現的延遲無界阻塞隊列。
隊列使用 PriorityQueue 來實現。隊列中的元素必須實現 Delayed 接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。我們可以將 DelayQueue 運用在以下應用場景:
- 緩存系統的設計:可以用 DelayQueue 保存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示緩存有效期到了。
- 定時任務調度。使用 DelayQueue 保存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行,從比如 Timer 就是使用 DelayQueue 實現的。
SynchronousQueue
SynchronousQueue 是一個不存儲元素的阻塞隊列,也即是單個元素的隊列。
每一個 put 操作必須等待一個 take 操作,否則不能繼續添加元素。SynchronousQueue 可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常適合于傳遞性場景, 比如在一個線程中使用的數據,傳遞給另外一個線程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
Coding
synchronousQueue 是一個沒有數據緩沖的阻塞隊列,生產者線程對其的插入操作 put() 必須等待消費者的移除操作 take(),反過來也一樣。
對應 peek, contains, clear, isEmpty ... 等方法其實是無效的。
但是 poll() 和 offer() 就不會阻塞,舉例來說就是 offer 的時候如果有消費者在等待那么就會立馬滿足返回 true,如果沒有就會返回 false,不會等待消費者到來。
public class SynchronousQueueDemo {public static void main(String[] args) {BlockingQueue<String> queue = new SynchronousQueue<>();//System.out.println(queue.offer("aaa")); //false//System.out.println(queue.poll()); //nullSystem.out.println(queue.add("bbb")); //IllegalStateException: Queue fullnew Thread(()->{try {System.out.println("Thread 1 put a");queue.put("a");System.out.println("Thread 1 put b");queue.put("b");System.out.println("Thread 1 put c");queue.put("c");} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(()->{try {TimeUnit.SECONDS.sleep(2);System.out.println("Thread 2 get:"+queue.take());TimeUnit.SECONDS.sleep(2);System.out.println("Thread 2 get:"+queue.take());TimeUnit.SECONDS.sleep(2);System.out.println("Thread 2 get:"+queue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();} } Thread 1 put a Thread 2 get:a Thread 1 put b Thread 2 get:b Thread 1 put c Thread 2 get:c源碼解讀
不像ArrayBlockingQueue、LinkedBlockingDeque之類的阻塞隊列依賴AQS實現并發操作,SynchronousQueue直接使用CAS實現線程的安全訪問。
synchronousQueue 提供了兩個構造器(公平與否),內部是通過 Transferer 來實現的,具體分為兩個Transferer,分別是 TransferStack 和 TransferQueue。
TransferStack:非公平競爭模式使用的數據結構是后進先出棧(LIFO Stack)
TransferQueue:公平競爭模式則使用先進先出隊列(FIFO Queue)
性能上兩者是相當的,一般情況下,FIFO 通常可以支持更大的吞吐量,但 LIFO 可以更大程度的保持線程的本地化。
private transient volatile Transferer<E> transferer;public SynchronousQueue() {this(false); }public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }分析 TransferQueue 的實現
//構造函數中會初始化一個出隊的節點,并且首尾都指向這個節點 TransferQueue() {QNode h = new QNode(null, false); // initialize to dummy node.head = h;tail = h; } //隊列節點, static final class QNode {volatile QNode next; // next node in queuevolatile Object item; // CAS'ed to or from nullvolatile Thread waiter; // to control park/unparkfinal boolean isData;QNode(Object item, boolean isData) {this.item = item;this.isData = isData;}// 設置next和item的值,用于進行并發更新, cas 無鎖操作boolean casNext(QNode cmp, QNode val) {return next == cmp &&UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}boolean casItem(Object cmp, Object val) {return item == cmp &&UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void tryCancel(Object cmp) {UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);}boolean isCancelled() {return item == this;}boolean isOffList() {return next == this;}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = QNode.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}} }從 put() 方法和 take() 方法可以看出最終調用的都是 TransferQueue 的 transfer() 方法。
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();throw new InterruptedException();} }public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException(); }//transfer方法用于提交數據或者是獲取數據 E transfer(E e, boolean timed, long nanos) {QNode s = null; // constructed/reused as needed//如果e不為null,就說明是添加數據的入隊操作boolean isData = (e != null);for (;;) {QNode t = tail;QNode h = head;if (t == null || h == null) // saw uninitialized valuecontinue; // spin//如果當前操作和 tail 節點的操作是一樣的;或者頭尾相同(表明隊列中啥都沒有)。if (h == t || t.isData == isData) { // empty or same-modeQNode tn = t.next;// 如果 t 和 tail 不一樣,說明,tail 被其他的線程改了,重來if (t != tail) // inconsistent readcontinue;// 如果 tail 的 next 不是空。就需要將 next 追加到 tail 后面了if (tn != null) { // lagging tail// 使用 CAS 將 tail.next 變成 tail,advanceTail(t, tn);continue;}// 時間到了,不等待,返回 null,插入失敗,獲取也是失敗的if (timed && nanos <= 0) // can't waitreturn null;if (s == null)s = new QNode(e, isData);if (!t.casNext(null, s)) // failed to link incontinue;advanceTail(t, s); // swing tail and waitObject x = awaitFulfill(s, e, timed, nanos);if (x == s) { // wait was cancelledclean(t, s);return null;}if (!s.isOffList()) { // not already unlinkedadvanceHead(t, s); // unlink if headif (x != null) // and forget fieldss.item = s;s.waiter = null;}return (x != null) ? (E)x : e;} else { // complementary-modeQNode m = h.next; // node to fulfillif (t != tail || m == null || h != head)continue; // inconsistent readObject x = m.item;if (isData == (x != null) || // m already fulfilledx == m || // m cancelled!m.casItem(x, e)) { // lost CASadvanceHead(h, m); // dequeue and retrycontinue;}advanceHead(h, m); // successfully fulfilledLockSupport.unpark(m.waiter);return (x != null) ? (E)x : e;}} }LinkedTransferQueue
LinkedTransferQueue 是一個由鏈表結構組成的無界阻塞 TransferQueue 隊列。
LinkedTransferQueue采用一種預占模式。意思就是消費者線程取元素時,如果隊列不為空,則直接取走數據,若隊列為空,那就生成一個節點(節點元素為null)入隊,然后消費者線程被等待在這個節點上,后面生產者線程入隊時發現有一個元素為null的節點,生產者線程就不入隊了,直接就將元素填充到該節點,并喚醒該節點等待的線程,被喚醒的消費者線程取走元素,從調用的方法返回。我們稱這種節點操作為“匹配”方式。
隊列實現了 TransferQueue 接口重寫了 tryTransfer 和 transfer 方法,這組方法和 SynchronousQueue 公平模式的隊列類似,具有匹配的功能
LinkedBlockingDeque
LinkedBlockingDeque 是一個由鏈表結構組成的雙向阻塞隊列。
所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以 Last 單詞結尾的方法,表示插入,獲取或移除雙端隊列的最后一個元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。
在初始化 LinkedBlockingDeque 時可以設置容量防止其過渡膨脹,默認容量也是 Integer.MAX_VALUE。另外雙向阻塞隊列可以運用在“工作竊取”模式中。
阻塞隊列使用場景
我們常用的生產者消費者模式就可以基于阻塞隊列實現;
線程池中活躍線程數達到 corePoolSize 時,線程池將會將后續的 task 提交到 BlockingQueue 中;
生產者消費者模式
JDK API文檔的 BlockingQueue 給出了一個典型的應用
面試題:一個初始值為 0 的變量,兩個線程對齊交替操作,一個+1,一個-1,5 輪
public class ProdCounsume_TraditionDemo {public static void main(String[] args) {ShareData shareData = new ShareData();new Thread(() -> {for (int i = 0; i <= 5; i++) {shareData.increment();}}, "T1").start();new Thread(() -> {for (int i = 0; i <= 5; i++) {shareData.decrement();}}, "T1").start();} }//線程操作資源類 class ShareData {private int num = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();public void increment() {lock.lock();try {while (num != 0) {//等待,不能生產condition.await();}//干活num++;System.out.println(Thread.currentThread().getName() + "\t" + num);//喚醒condition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void decrement() {lock.lock();try {while (num == 0) {//等待,不能生產condition.await();}//干活num--;System.out.println(Thread.currentThread().getName() + "\t" + num);//喚醒condition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}} }線程池
線程池的核心方法 ThreadPoolExecutor,用 BlockingQueue 存放任務的阻塞隊列,被提交但尚未被執行的任務
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)線程池在內部實際也是構建了一個生產者消費者模型,將線程和任務兩者解耦,并不直接關聯,從而良好的緩沖任務,復用線程。
不同的線程池實現用的是不同的阻塞隊列,newFixedThreadPool 和 newSingleThreadExecutor 用的是LinkedBlockingQueue,newCachedThreadPool 用的是 SynchronousQueue。
文章持續更新,可以微信搜「 JavaKeeper 」第一時間閱讀,無套路領取 500+ 本電子書和 30+ 視頻教學和源碼,本文 GitHub github.com/JavaKeeper 已經收錄,Javaer 開發、面試必備技能兵器譜,有你想要的。
參考與感謝
Home - 廖雪峰的官方網站
SynchronousQueue源碼 并發編程之 SynchronousQueue 核心源碼分析 - 掘金
總結
以上是生活随笔為你收集整理的转:Java 7 种阻塞队列详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dly电脑是什么意思(dly笔记本电脑)
- 下一篇: java美元兑换,(Java实现) 美元