日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

聊聊并发(七)——Java中的阻塞队列

發布時間:2025/5/22 java 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊并发(七)——Java中的阻塞队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 什么是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。

阻塞隊列提供了四種處理方法:

方法\處理方式拋出異常返回特殊值一直阻塞超時退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
檢查方法element()peek()不可用不可用
  • 拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException(“Queue full”)異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

2. Java里的阻塞隊列

JDK7提供了7個阻塞隊列。分別是

  • ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  • PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼創建一個公平的阻塞隊列:

1ArrayBlockingQueue fairQueue =?new??ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現的,代碼如下:

1public?ArrayBlockingQueue(int?capacity,?boolean?fair) {
2????????if?(capacity <=?0)
3????????????throw?new?IllegalArgumentException();
4????????this.items =?new?Object[capacity];
5????????lock =?new?ReentrantLock(fair);
6????????notEmpty = lock.newCondition();
7????????notFull =? lock.newCondition();
8}

LinkedBlockingQueue

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

PriorityBlockingQueue

PriorityBlockingQueue是一個支持優先級的無界隊列。默認情況下元素采取自然順序排列,也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列。

DelayQueue

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。我們可以將DelayQueue運用在以下應用場景:

  • 緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
  • 定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。

隊列中的Delayed必須實現compareTo來指定元素的順序。比如讓延時時間最長的放在隊列的末尾。實現代碼如下:

01public?int?compareTo(Delayed other) {
02???????????if?(other ==?this)?// compare zero ONLY if same object
03????????????????return?0;
04????????????if?(other?instanceof?ScheduledFutureTask) {
05????????????????ScheduledFutureTask x = (ScheduledFutureTask)other;
06????????????????long?diff = time - x.time;
07????????????????if?(diff <?0)
08????????????????????return?-1;
09????????????????else?if?(diff >?0)
10????????????????????return?1;
11???????else?if?(sequenceNumber < x.sequenceNumber)
12????????????????????return?-1;
13????????????????else
14????????????????????return?1;
15????????????}
16????????????long?d = (getDelay(TimeUnit.NANOSECONDS) -
17??????????????????????other.getDelay(TimeUnit.NANOSECONDS));
18????????????return?(d ==?0) ??0?: ((d <?0) ? -1?:?1);
19????????}

如何實現Delayed接口

我們可以參考ScheduledThreadPoolExecutor里ScheduledFutureTask類。這個類實現了Delayed接口。首先:在對象創建的時候,使用time記錄前對象什么時候可以使用,代碼如下:

1ScheduledFutureTask(Runnable r, V result,?long?ns,?long?period) {
2????????????super(r, result);
3????????????this.time = ns;
4????????????this.period = period;
5????????????this.sequenceNumber = sequencer.getAndIncrement();
6}

然后使用getDelay可以查詢當前元素還需要延時多久,代碼如下:

public long getDelay(TimeUnit unit) {return unit.convert(time - now(), TimeUnit.NANOSECONDS);}

通過構造函數可以看出延遲時間參數ns的單位是納秒,自己設計的時候最好使用納秒,因為getDelay時可以指定任意單位,一旦以納秒作為單位,而延時的時間又精確不到納秒就麻煩了。使用時請注意當time小于當前時間時,getDelay會返回負數。

如何實現延時隊列

延時隊列的實現很簡單,當消費者從隊列里獲取元素時,如果元素沒有達到延時時間,就阻塞當前線程。

1long?delay = first.getDelay(TimeUnit.NANOSECONDS);
2????????????????????if?(delay <=?0)
3????????????????????????return?q.poll();
4????????????????????else?if?(leader !=?null)
5????????????????????????available.await();

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。SynchronousQueue可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常適合于傳遞性場景,比如在一個線程中使用的數據,傳遞給另外一個線程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue

LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對于其他阻塞隊列LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法。如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,并等到該元素被消費者消費了才返回。transfer方法的關鍵代碼如下:

1Node pred = tryAppend(s, haveData);
2return?awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代碼是試圖把存放當前元素的s節點作為tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數后使用Thread.yield()方法來暫停當前正在執行的線程,并執行其他線程。

tryTransfer方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回。而transfer方法是必須等到消費者消費了才返回。

對于帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。

LinkedBlockingDeque

LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入,獲取或移除雙端隊列的最后一個元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法卻等同于takeFirst,不知道是不是Jdk的bug,使用時還是用帶有First和Last后綴的方法更清楚。在初始化LinkedBlockingDeque時可以初始化隊列的容量,用來防止其再擴容時過渡膨脹。另外雙向阻塞隊列可以運用在“工作竊取”模式中。

3. 阻塞隊列的實現原理

如果隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,讓生產者和消費者能夠高效率的進行通訊呢?讓我們先來看看JDK是如何實現的。

使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列里添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素后,會通知生產者當前隊列可用。通過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現,代碼如下:

01private?final?Condition notFull;
02private?final?Condition notEmpty;
03?
04public?ArrayBlockingQueue(int?capacity,?boolean?fair) {
05????????//省略其他代碼
06????????notEmpty = lock.newCondition();
07????????notFull =? lock.newCondition();
08????}
09?
10public?void?put(E e)?throws?InterruptedException {
11????????checkNotNull(e);
12????????final?ReentrantLock lock =?this.lock;
13????????lock.lockInterruptibly();
14????????try?{
15????????????while?(count == items.length)
16????????????????notFull.await();
17????????????insert(e);
18????????}?finally?{
19????????????lock.unlock();
20????????}
21}
22?
23public?E take()?throws?InterruptedException {
24????????final?ReentrantLock lock =?this.lock;
25????????lock.lockInterruptibly();
26????????try?{
27????????????while?(count ==?0)
28????????????????notEmpty.await();
29????????????return?extract();
30??}?finally?{
31????????????lock.unlock();
32????????}
33}
34?
35private?void?insert(E x) {
36????????items[putIndex] = x;
37????????putIndex = inc(putIndex);
38????????++count;
39????????notEmpty.signal();
40????}

當我們往隊列里插入一個元素時,如果隊列不可用,阻塞生產者主要通過LockSupport.park(this);來實現

01public?final?void?await()?throws?InterruptedException {
02????????????if?(Thread.interrupted())
03????????????????throw?new?InterruptedException();
04????????????Node node = addConditionWaiter();
05????????????int?savedState = fullyRelease(node);
06????????????int?interruptMode =?0;
07????????????while?(!isOnSyncQueue(node)) {
08????????????????LockSupport.park(this);
09????????????????if?((interruptMode = checkInterruptWhileWaiting(node)) !=?0)
10????????????????????break;
11????????????}
12????????????if?(acquireQueued(node, savedState) && interruptMode != THROW_IE)
13????????????????interruptMode = REINTERRUPT;
14????????????if?(node.nextWaiter !=?null)?// clean up if cancelled
15????????????????unlinkCancelledWaiters();
16????????????if?(interruptMode !=?0)
17?
18reportInterruptAfterWait(interruptMode);
19????????}

繼續進入源碼,發現調用setBlocker先保存下將要阻塞的線程,然后調用unsafe.park阻塞當前線程。

1public?static?void?park(Object blocker) {
2????????Thread t = Thread.currentThread();
3????????setBlocker(t, blocker);
4????????unsafe.park(false, 0L);
5????????setBlocker(t,?null);
6????}

unsafe.park是個native方法,代碼如下:

1public?native?void?park(boolean?isAbsolute,?long?time);

park這個方法會阻塞當前線程,只有以下四種情況中的一種發生時,該方法才會返回。

  • 與park對應的unpark執行或已經執行時。注意:已經執行是指unpark先執行,然后再執行的park。
  • 線程被中斷時。
  • 如果參數中的time不是零,等待了指定的毫秒數時。
  • 發生異常現象時。這些異常事先無法確定。

我們繼續看一下JVM是如何實現park方法的,park在不同的操作系統使用不同的方式實現,在linux下是使用的是系統方法pthread_cond_wait實現。實現代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代碼如下:

01void?os::PlatformEvent::park() {
02?????????????int?v ;
03?????????for?(;;) {
04????????v = _Event ;
05?????????if?(Atomic::cmpxchg (v-1, &_Event, v) == v)?break?;
06?????????}
07?????????guarantee (v >= 0,?"invariant") ;
08?????????if?(v == 0) {
09?????????// Do this the hard way by blocking ...
10?????????int?status = pthread_mutex_lock(_mutex);
11?????????assert_status(status == 0, status,?"mutex_lock");
12?????????guarantee (_nParked == 0,?"invariant") ;
13?????????++ _nParked ;
14?????????while?(_Event < 0) {
15?????????status = pthread_cond_wait(_cond, _mutex);
16?????????// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
17?????????// Treat this the same as if the wait was interrupted
18?????????if?(status == ETIME) { status = EINTR; }
19?????????assert_status(status == 0 || status == EINTR, status,?"cond_wait");
20?????????}
21?????????-- _nParked ;
22?
23?????????// In theory we could move the ST of 0 into _Event past the unlock(),
24?????????// but then we'd need a MEMBAR after the ST.
25?????????_Event = 0 ;
26?????????status = pthread_mutex_unlock(_mutex);
27?????????assert_status(status == 0, status,?"mutex_unlock");
28?????????}
29?????????guarantee (_Event >= 0,?"invariant") ;
30?????????}
31?
32?????}

pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思可以理解為線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal實現的。park 在windows下則是使用WaitForSingleObject實現的。

當隊列滿時,生產者往阻塞隊列里插入一個元素,生產者線程會進入WAITING (parking)狀態。我們可以使用jstack dump阻塞的生產者線程看到這點:

1"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
2???java.lang.Thread.State: WAITING (parking)
3????????at sun.misc.Unsafe.park(Native Method)
4????????- parking to wait for? <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
5????????at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
6????????at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
7????????at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
8????????at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)

轉載于:https://www.cnblogs.com/xunianchong/p/7509040.html

總結

以上是生活随笔為你收集整理的聊聊并发(七)——Java中的阻塞队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。