日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

JUC多线程:阻塞队列ArrayBlockingQueue与LinkedBlockingQueue

發布時間:2024/9/30 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 JUC多线程:阻塞队列ArrayBlockingQueue与LinkedBlockingQueue 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、什么是阻塞隊列:

阻塞隊列最大的特性在于支持阻塞添加和阻塞刪除方法

  • 阻塞添加:當阻塞隊列已滿時,隊列會阻塞加入元素的線程,直到隊列元素不滿時才重新喚醒線程執行加入元素操作。

  • 阻塞刪除:但阻塞隊列元素為空時,刪除隊列元素的線程將被阻塞,直到隊列不為空再執行刪除操作

Java 中的阻塞隊列接口 BlockingQueue 繼承自 Queue 接口,因此先來看看阻塞隊列接口為我們提供的主要方法:

public interface BlockingQueue<E> extends Queue<E> {// 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量)// 在成功時返回 true,如果此隊列已滿,則拋IllegalStateException。 boolean add(E e); // 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量) // 如果該隊列已滿,則在到達指定的等待時間之前等待可用的空間,該方法可中斷 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //將指定的元素插入此隊列的尾部,如果該隊列已滿,則一直等到(阻塞)。 void put(E e) throws InterruptedException; //獲取并移除此隊列的頭部,如果沒有元素則等待(阻塞),直到有元素將喚醒等待線程執行該操作 E take() throws InterruptedException; //獲取并移除此隊列的頭部,在指定的等待時間前一直等到獲取元素, //超過時間方法將結束E poll(long timeout, TimeUnit unit) throws InterruptedException; //從此隊列中移除指定元素的單個實例(如果存在)。 boolean remove(Object o); }//除了上述方法還有繼承自Queue接口的方法 //獲取但不移除此隊列的頭元素,沒有則跑異常NoSuchElementException E element(); //獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。 E peek(); //獲取并移除此隊列的頭,如果此隊列為空,則返回 null。 E poll();

這里我們把上述操作進行分類:

(1)插入方法:

  • add(E e):添加成功返回 true,失敗拋 IllegalStateException 異常
  • offer(E e):成功返回 true,如果此隊列已滿,則返回 false
  • put(E e):將元素插入此隊列的尾部,如果該隊列已滿,則一直阻塞

(2)刪除方法

  • remove(Object o):移除指定元素,成功返回 true,失敗返回 false
  • poll():獲取并移除此隊列的頭元素,若隊列為空,則返回 null
  • take():獲取并移除此隊列頭元素,若沒有元素則一直阻塞

(3)檢查方法:

  • element() :獲取但不移除此隊列的頭元素,沒有元素則拋異常
  • peek() :獲取但不移除此隊列的頭;若隊列為空,則返回 null

二、阻塞隊列的實現原理:

1、ArrayBlockingQueue:

1.1、數據結構:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 存儲數據的數組 */final Object[] items;/**獲取數據的索引,主要用于take,poll,peek,remove方法 */int takeIndex;/**添加數據的索引,主要用于 put, offer, or add 方法*/int putIndex;/** 隊列元素的個數 */int count;/** 控制并非訪問的鎖 */final ReentrantLock lock;/**notEmpty條件對象,用于通知take方法隊列已有元素,可執行獲取操作 */private final Condition notEmpty;/** notFull條件對象,用于通知put方法隊列未滿,可執行添加操作 */private final Condition notFull;/** 迭代器 */transient Itrs itrs = null; }

????????ArrayBlockingQueue 內部通過數組對象 items 來存儲所有的數據,需要注意的是ArrayBlockingQueue 通過一個 ReentrantLock 來同時控制添加線程與移除線程的并發訪問,這點與 LinkedBlockingQueue 區別很大(稍后會分析)。而對于 notEmpty 條件對象則是用于存放等待或喚醒調用 take() 方法的線程,告訴他們隊列已有元素,可以執行獲取操作。同理 notFull 條件對象是用于等待或喚醒調用 put() 方法的線程,告訴它們隊列未滿,可以執行添加元素的操作。takeIndex 代表的是下一個方法(take,poll,peek,remove)被調用時獲取數組元素的索引,putIndex 則代表下一個方法(put, offer, or add)被調用時元素添加到數組中的索引。

1.2、阻塞添加:put()?

????????put() 方法特點是阻塞添加,當隊列滿時通過條件對象來阻塞當前調用 put() 方法的線程,直到線程又再次被喚醒執行。總得來說添加線程的執行存在以下兩種情況:一是隊列已滿,那么新到來的put?線程將添加到 notFull 的條件隊列中等待;二是有移除線程執行移除操作,移除成功同時喚醒put線程。

具體代碼如下:

//put方法,阻塞時可中斷public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();//該方法可中斷try {//當隊列元素個數與數組長度相等時,無法添加元素while (count == items.length)//將當前調用線程掛起,添加到notFull條件隊列中等待喚醒notFull.await();enqueue(e);//如果隊列沒有滿直接添加。。} finally {lock.unlock();}}//入隊操作 private void enqueue(E x) {//獲取當前數組final Object[] items = this.items;//通過putIndex索引對數組進行賦值items[putIndex] = x;//索引自增,如果已是最后一個位置,重新設置 putIndex = 0;if (++putIndex == items.length)putIndex = 0;count++;//隊列中元素數量加1//喚醒調用take()方法的線程,執行元素獲取操作。notEmpty.signal(); }

1.3、阻塞刪除:take()?

????????take() 方法其實很簡單,有就刪除沒有就阻塞,注意這個阻塞是可以中斷的,如果隊列沒有數據那么就加入 notEmpty 條件隊列等待(有數據就直接取走,方法結束),如果有新的put線程添加了數據,那么 put 操作將會喚醒 take 線程,執行 take 操作,圖示如下:

具體代碼如下:

//從隊列頭部刪除,隊列沒有元素就阻塞,可中斷public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();//中斷try {//如果隊列沒有元素while (count == 0)//執行阻塞操作notEmpty.await();return dequeue();//如果隊列有元素執行刪除操作} finally {lock.unlock();}}//刪除隊列頭元素并返回private E dequeue() {//拿到當前數組的數據final Object[] items = this.items;@SuppressWarnings("unchecked")//獲取要刪除的對象E x = (E) items[takeIndex];將數組中takeIndex索引位置設置為nullitems[takeIndex] = null;//takeIndex索引加1并判斷是否與數組長度相等,//如果相等說明已到盡頭,恢復為0if (++takeIndex == items.length)takeIndex = 0;count--;//隊列個數減1if (itrs != null)itrs.elementDequeued();//同時更新迭代器中的元素數據//刪除了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操作notFull.signal();return x;}

2、LinkedBlockingQueue:

????????LinkedBlockingQueue 是一個基于鏈表的阻塞隊列,其內部維持一個基于鏈表的數據隊列,但大小默認值為 Integer.MAX_VALUE,建議使用 LinkedBlockingQueue時手動傳值,避免隊列過大造成機器負載或者內存爆滿等情況

2.1、數據結構:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/*** 節點類,用于存儲數據*/static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}/** 阻塞隊列的大小,默認為Integer.MAX_VALUE */private final int capacity;/** 當前阻塞隊列中的元素個數 */private final AtomicInteger count = new AtomicInteger();/** 阻塞隊列的頭結點 */transient Node<E> head;/** 阻塞隊列的尾節點 */private transient Node<E> last;/** 獲取并移除元素時使用的鎖,如take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** notEmpty條件對象,當隊列沒有數據時用于掛起執行刪除的線程 */private final Condition notEmpty = takeLock.newCondition();/** 添加元素時使用的鎖如 put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** notFull條件對象,當隊列數據已滿時用于掛起執行添加的線程 */private final Condition notFull = putLock.newCondition(); }

????????從上述可看成,每個添加到 LinkedBlockingQueue 隊列中的數據都將被封裝成 Node 節點,添加的鏈表隊列中,其中 head 和 last 分別指向隊列的頭結點和尾結點。與 ArrayBlockingQueue 不同的是,LinkedBlockingQueue 內部分別使用了 takeLock 和 putLock 對并發進行控制,也就是說,添加和刪除操作并不是互斥操作,可以同時進行,可以大大提高吞吐量。這里再次強調如果沒有給 LinkedBlockingQueue 指定容量大小,其默認值將是 Integer.MAX_VALUE,如果存在添加速度大于刪除速度時候,有可能會內存溢出。至于 LinkedBlockingQueue 的實現原理圖與 ArrayBlockingQueue 是類似的,除了對添加和移除方法使用單獨的鎖控制外,兩者都使用了不同的 Condition 條件對象作為等待隊列,用于掛起 take 線程和 put 線程。

2.2、阻塞添加:put()

public void put(E e) throws InterruptedException {//添加元素為null直接拋出異常if (e == null) throw new NullPointerException();int c = -1;//構建節點Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;//獲取隊列的個數final AtomicInteger count = this.count;putLock.lockInterruptibly();try {//判斷隊列是否已滿,如果已滿則阻塞當前線程while (count.get() == capacity) {notFull.await();}//添加元素并更新count值enqueue(node);c = count.getAndIncrement();//如果隊列容量還沒滿,喚醒下一個添加線程,執行添加操作if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}//由于存在添加鎖和消費鎖,而消費鎖和添加鎖都會持續喚醒等待線程,因此count肯定會變化//這里的if條件表示如果隊列中還有1條數據,由于隊列中存在數據那么就喚醒消費鎖if (c == 0)signalNotEmpty();}

這里的 put()方法做了兩件事,第一件事是判斷隊列是否滿,滿了將當前線程加入等下隊列,沒滿就將節點封裝成 Node入隊,然后再次判斷隊列添加完成后是否已滿,不滿就繼續喚醒等到在條件對象 notFull 上的添加線程。第二件事是,判斷是否需要喚醒等到在 notEmpty 條件對象上的消費線程。這里我們可能會有點疑惑,為什么添加完成后是繼續喚醒在條件對象 notFull 上的添加線程而不是像 ArrayBlockingQueue 那樣直接喚醒 notEmpty 條件對象上的消費線程?而又為什么要當 if (c == 0) 時才去喚醒消費線程呢?

  • (1)喚醒添加線程的原因:在添加新元素完成后,會判斷隊列是否已滿,不滿就繼續喚醒在條件對象 notFull 上的添加線程,這點與前面分析的 ArrayBlockingQueue 很不相同,在ArrayBlockingQueue 內部完成添加操作后,會直接喚醒消費線程對元素進行獲取,這是因為ArrayBlockingQueue 只用了一個 ReenterLock 同時對添加線程和消費線程進行控制,這樣如果在添加完成后再次喚醒添加線程的話,消費線程可能永遠無法執行,而對于 LinkedBlockingQueue 來說就不一樣了,其內部對添加線程和消費線程分別使用了各自的 ReenterLock 鎖對并發進行控制,也就是說添加線程和消費線程是不會互斥的,所以添加鎖只要管好自己的添加線程即可,添加線程自己直接喚醒自己的其他添加線程,如果沒有等待的添加線程,直接結束了。如果有就直到隊列元素已滿才結束掛起,注意消費線程的執行過程也是如此。這也是為什么 LinkedBlockingQueue 的吞吐量要相對大些的原因。
  • (2)為什么?if (c == 0) 時才去喚醒消費線程:這是因為消費線程一旦被喚醒,就一直處于消費的狀態,直到隊列為空才結束,所以 c 值是一直在變化的(c值是添加完元素前隊列的大小),此時 c 只可能是等于0或大于0,如果是 c=0,那么說明之前消費線程已停止,條件對象上可能存在等待的消費線程,添加完數據后應該是 c+1,那么有數據就直接喚醒等待消費線程,如果沒有就結束啦,等待下一次的消費操作。如果 c>0 那么消費線程就不會被喚醒,只能等待下一個消費操作(poll、take、remove)的調用,那為什么不是條件 c>0 才去喚醒呢?我們要明白的是消費線程一旦被喚醒會和添加線程一樣,一直不斷喚醒其他消費線程,如果添加前 c>0,那么很可能上一次調用的消費線程后,數據并沒有被消費完,條件隊列上也就不存在等待的消費線程了,所以 c>0 喚醒消費線程得意義不是很大,當然如果添加線程一直添加元素,那么一直 c>0,消費線程執行的換就要等待下一次調用消費操作了(poll、take、remove)

2.3、阻塞刪除:take()

public E take() throws InterruptedException {E x;int c = -1;//獲取當前隊列大小final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();//可中斷try {//如果隊列沒有數據,掛機當前線程到條件對象的等待隊列中while (count.get() == 0) {notEmpty.await();}//如果存在數據直接刪除并返回該數據x = dequeue();c = count.getAndDecrement();//隊列大小減1if (c > 1)notEmpty.signal();//還有數據就喚醒后續的消費線程} finally {takeLock.unlock();}//滿足條件,喚醒條件對象上等待隊列中的添加線程if (c == capacity)signalNotFull();return x;}

????????take() 方法是一個可阻塞可中斷的移除方法,主要做了兩件事,一是,如果隊列沒有數據就掛起當前線程到 notEmpty 條件對象的等待隊列中一直等待,如果有數據就刪除節點并返回數據項,同時喚醒后續消費線程,二是嘗試喚醒條件對象 notFull 上等待隊列中的添加線程。?

3、ArrayBlockingQueue 和 LinkedBlockingQueue 迥異:

????????通過上述的分析,對于 ArrayBlockingQueue 和 LinkedBlockingQueue?的基本使用以及內部實現原理我們已較為熟悉了,這里我們就對它們兩間的區別來個小結:

  • (1)隊列大小有所不同,ArrayBlockingQueue 是有界的初始化必須指定大小,而LinkedBlockingQueue 可以是有界的也可以是無界的(默認是 Integer.MAX_VALUE),對于后者而言,當添加速度大于移除速度時,在無界的情況下,可能會造成內存溢出等問題
  • (2)數據存儲容器不同,ArrayBlockingQueue 采用的是數組作為數據存儲容器,而LinkedBlockingQueue 采用的則是以 Node 節點作為連接對象的鏈表
  • (3)創建與銷毀對象的開銷不同,ArrayBlockingQueue 采用數組作為存儲容器,在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而 LinkedBlockingQueue 則會生成一個額外的 Node 對象。在長時間內需要高效并發地處理大批量數據的時,對于GC可能存在較大影響。
  • (4)隊列添加或移除的鎖不一樣,ArrayBlockingQueue 的鎖是沒有分離的,添加操作和移除操作采用同一個 ReenterLock 鎖,而 LinkedBlockingQueue 的鎖是分離的,添加采用的是 putLock,移除采用的是 takeLock,這樣能大大提高隊列的吞吐量,也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。

參考文章:https://blog.csdn.net/javazejian/article/details/77410889

總結

以上是生活随笔為你收集整理的JUC多线程:阻塞队列ArrayBlockingQueue与LinkedBlockingQueue的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。