日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

10分钟搞定 Java 并发队列好吗?好的

發布時間:2025/3/21 java 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 10分钟搞定 Java 并发队列好吗?好的 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

如果按照用途與特性進行粗略的劃分,JUC 包中包含的工具大體可以分為 6 類:

  • 執行者與線程池

  • 并發隊列

  • 同步工具

  • 并發集合

  • 原子變量

  • 在【并發系列】中,主要講解了?執行者與線程池,同步工具,鎖?, 在分析源碼時,或多或少的提及到了「隊列」,隊列在 JUC 中也是多種多樣存在,所以本文就以「遠看」視角,幫助大家快速了解與區分這些看似「雜亂」的隊列

    并發隊列

    Java 并發隊列按照實現方式來進行劃分可以分為 2 種:

  • 阻塞隊列

  • 非阻塞隊列

  • 如果你已經看完并發系列鎖的實現,你已經能夠知道他們實現的區別:

    前者就是基于鎖實現的,后者則是基于 CAS 非阻塞算法實現的

    常見的隊列有下面這幾種:

    瞬間懵逼?看到這個沒有人性的圖想直接走人?客觀先別急,一會就柳暗花明了

    當下你也許有個問題:

    為什么會有這么多種隊列的存在?

    鎖有應對各種情形的鎖,隊列也自然有應對各種情形的隊列了, 是不是也有點單一職責原則的意思呢?

    所以我們要了解這些隊列到底是怎么設計的?以及用在了哪些地方?

    先來看下圖

    如果你在 IDE 中打開以上非阻塞隊列和阻塞隊列,查看其實現方法,你就會發現,阻塞隊列較非阻塞隊列?額外支持兩種操作:

  • 阻塞的插入

    當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿

  • 阻塞的移除

    當隊列為空時,獲取元素的線程會阻塞,直到隊列變為非空

  • 綜合說明入隊/出隊操作,看似雜亂的方法,用一個表格就能概括了

    拋出異常

    • 當隊列滿時,此時如果再向隊列中插入元素,會拋出 IllegalStateException (這很好理解)

    • 當隊列空時,此時如果再從隊列中獲取元素,會拋出 NoSuchElementException ?(這也很好理解)

    返回特殊值

    • 當向隊列插入元素時,會返回元素是否插入成功,成功則返回 true

    • 當從隊列移除元素時,如果沒有則返回 null

    一直阻塞

    • 當隊列滿時,如果生產者線程向隊列 put 元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出

    • 當隊列為空時,如果消費者線程?從隊列里面 take 元素,隊列會阻塞消費者線程,直到隊列不為空

    關于阻塞,我們其實早在?并發編程之等待通知機制?就已經充分說明過了,你還記得下面這張圖嗎?原理其實是一樣一樣滴

    超時退出

    和鎖一樣,因為有阻塞,為了靈活使用,就一定支持超時退出,阻塞時間達到超時時間,就會直接返回

    至于為啥插入和移除這么多種單詞表示形式,我也不知道,為了方便記憶,只需要記住阻塞的方法形式即可:

    單詞?put?和?take?字母?t?首位相連,一個放,一個拿

    到這里你應該對 Java 并發隊列有了個初步的認識了,原來看似雜亂的方法貌似也有了規律。接下來就到了瘋狂串知識點的時刻了,借助前序章節的知識,分分鐘就理解全部隊列了

    ArrayBlockingQueue

    之前也說過,JDK中的命名還是很講究滴,一看這名字,底層就是數組實現了,是否有界,那就看在構造的時候是否需要指定 capacity 值了

    填鴨式的說明也容易忘,這些都是哪看到的呢?在所有隊列的 Java docs 的第一段,一句話就概括了該隊列的主要特性,所以強烈建議大家自己在看源碼時,簡單瞄一眼?docs 開頭,心中就有多半個數

    在講?Java AQS隊列同步器以及ReentrantLock的應用?時我們介紹了公平鎖與非公平鎖的概念,ArrayBlockingQueue 也有同樣的概念,看它的構造方法,就有 ReentrantLock 來輔助實現

    public?ArrayBlockingQueue(int?capacity,?boolean?fair)?{if?(capacity?<=?0)throw?new?IllegalArgumentException();this.items?=?new?Object[capacity];lock?=?new?ReentrantLock(fair);notEmpty?=?lock.newCondition();notFull?=??lock.newCondition(); }

    默認情況下,依舊是不保證線程公平訪問隊列(公平與否是指阻塞的線程能否按照阻塞的先后順序訪問隊列,先阻塞線訪問,后阻塞后訪問)

    到這我也要臨時問一個說過多次的面試送分題了:

    為什么默認采用非公平鎖的方式?它較公平鎖方式有什么好處,又可能帶來哪些問題?

    知道了以上內容,結合上面表格中的方法,ArrayBlockingQueue 就可以輕松過關了

    和數組相對的自然是鏈表了

    LinkedBlockingQueue

    LinkedBlockingQueue 也算是一個有界阻塞隊列 ,從下面的構造函數中你也可以看出,該隊列的默認和最大長度為 Integer.MAX_VALUE ,這也就 docs 說 optionally-bounded 的原因了

    public?LinkedBlockingQueue()?{this(Integer.MAX_VALUE); }public?LinkedBlockingQueue(int?capacity)?{if?(capacity?<=?0)?throw?new?IllegalArgumentException();this.capacity?=?capacity;last?=?head?=?new?Node<E>(null); }

    正如 Java 集合一樣,鏈表形式的隊列,其存取效率要比數組形式的隊列高。但是在一些并發程序中,數組形式的隊列由于具有一定的可預測性,因此可以在某些場景中獲得更高的效率

    看到 LinkedBlockingQueue 是不是也有些熟悉呢?為什么要使用線程池??就已經和它多次照面了

    創建單個線程池

    public?static?ExecutorService?newSingleThreadExecutor()?{return?new?FinalizableDelegatedExecutorService(new?ThreadPoolExecutor(1,?1,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>())); }

    創建固定個數線程池

    public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{return?new?ThreadPoolExecutor(nThreads,?nThreads,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>()); }

    面試送分題又來了

    使用 Executors 創建線程池很簡單,為什么大廠嚴格要求禁用這種創建方式呢?

    PriorityBlockingQueue

    PriorityBlockingQueue 是一個支持優先級的無界的阻塞隊列,默認情況下采用自然順序升序排列,當然也有非默認情況自定義優先級,需要排序,那自然要用到 Comparator 來定義排序規則了

    可以定義優先級,自然也就有相應的限制,以及使用的注意事項

    • 按照上圖說明,隊列中不允許存在 null 值,也不允許存在不能排序的元素

    • 對于排序值相同的元素,其序列是不保證的,但你可以繼續自定義其他可以區分出來優先級的值,如果你有嚴格的優先級區分,建議有更完善的比較規則,就像 Java docs 這樣

      ?class?FIFOEntry<E?extends?Comparable<??super?E>>implements?Comparable<FIFOEntry<E>>?{static?final?AtomicLong?seq?=?new?AtomicLong(0);final?long?seqNum;final?E?entry;public?FIFOEntry(E?entry)?{seqNum?=?seq.getAndIncrement();this.entry?=?entry;}public?E?getEntry()?{?return?entry;?}public?int?compareTo(FIFOEntry<E>?other)?{int?res?=?entry.compareTo(other.entry);if?(res?==?0?&&?other.entry?!=?this.entry)res?=?(seqNum?<?other.seqNum???-1?:?1);return?res;}}
    • 隊列容量是沒有上限的,但是如果插入的元素超過負載,有可能會引起OutOfMemory異常(這是肯定的),這也是為什么我們通常所說,隊列無界,心中有界

    • PriorityBlockingQueue 也有 put 方法,這是一個阻塞的方法,因為它是無界的,自然不會阻塞,所以就有了下面比較聰明的做法

      public?void?put(E?e)?{offer(e);?//?never?need?to?block??請自行對照上面表格 }
    • 可以給定初始容量,這個容量會按照一定的算法自動擴充

      //?Default?array?capacity. private?static?final?int?DEFAULT_INITIAL_CAPACITY?=?11;public?PriorityBlockingQueue()?{this(DEFAULT_INITIAL_CAPACITY,?null); }

      這里默認的容量是 11,由于也是基于數組,那面試送分題又來了

      你通常是怎樣定義容器/集合初始容量的?有哪些依據?

    DelayQueue

    DelayQueue 是一個支持延時獲取元素的無界阻塞隊列

    • 是否延時肯定是和某個時間(通常和當前時間) 進行比較

    • 比較過后還要進行排序,所以也是存在一定的優先級

    看到這也許覺得這有點和?PriorityBlockingQueue?很像,沒錯,DelayQueue?的內部也是使用?PriorityQueue

    上圖綠色框線也告訴你,DelayQueue 隊列的元素必須要實現 Depayed 接口:

    所以從上圖可以看出使用 DelayQueue 非常簡單,只需要兩步:

    實現 getDelay() 方法,返回元素要延時多長時間

    public?long?getDelay(TimeUnit?unit)?{//?最好采用納秒形式,這樣更精確return?unit.convert(time?-?now(),?NANOSECONDS); }

    實現 compareTo() 方法,比較元素順序

    public?int?compareTo(Delayed?other)?{if?(other?==?this)?//?compare?zero?if?same?objectreturn?0;if?(other?instanceof?ScheduledFutureTask)?{ScheduledFutureTask<?>?x?=?(ScheduledFutureTask<?>)other;long?diff?=?time?-?x.time;if?(diff?<?0)return?-1;else?if?(diff?>?0)return?1;else?if?(sequenceNumber?<?x.sequenceNumber)return?-1;elsereturn?1;}long?diff?=?getDelay(NANOSECONDS)?-?other.getDelay(NANOSECONDS);return?(diff?<?0)???-1?:?(diff?>?0)???1?:?0; }

    上面的代碼哪來的呢?如果你打開 ScheduledThreadPoolExecutor 里的 ScheduledFutureTask,你就看到了 (ScheduledThreadPoolExecutor 內部就是應用 DelayQueue)

    所以綜合來說,下面兩種情況非常適合使用 DelayQueue

    • 緩存系統的設計:用 DelayQueue 保存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,如果能從 DelayQueue 中獲取元素,說明緩存有效期到了

    • 定時任務調度:用 DelayQueue 保存當天會執行的任務以及時間,如果能從 DelayQueue 中獲取元素,任務就可以開始執行了。比如 TimerQueue 就是這樣實現的

    SynchronousQueue

    這是一個不存儲元素的阻塞隊列,不存儲元素還叫隊列?

    沒錯,SynchronousQueue 直譯過來叫同步隊列,如果在隊列里面呆久了應該就算是“異步”了吧

    所以使用它,每個put() 操作必須要等待一個 take() 操作,反之亦然,否則不能繼續添加元素

    實際中怎么用呢?假如你需要兩個線程之間同步共享變量,如果不用 SynchronousQueue 你可能會選擇用 CountDownLatch 來完成,就像這樣:

    ExecutorService?executor?=?Executors.newFixedThreadPool(2); AtomicInteger?sharedState?=?new?AtomicInteger(); CountDownLatch?countDownLatch?=?new?CountDownLatch(1);Runnable?producer?=?()?->?{Integer?producedElement?=?ThreadLocalRandom.current().nextInt();sharedState.set(producedElement);countDownLatch.countDown(); };Runnable?consumer?=?()?->?{try?{countDownLatch.await();Integer?consumedElement?=?sharedState.get();}?catch?(InterruptedException?ex)?{ex.printStackTrace();} };

    這點小事就用計數器來實現,顯然很不合適,用 SynchronousQueue 改造一下,感覺瞬間就不一樣了

    ExecutorService?executor?=?Executors.newFixedThreadPool(2); SynchronousQueue<Integer>?queue?=?new?SynchronousQueue<>();Runnable?producer?=?()?->?{Integer?producedElement?=?ThreadLocalRandom.current().nextInt();try?{queue.put(producedElement);}?catch?(InterruptedException?ex)?{ex.printStackTrace();} };Runnable?consumer?=?()?->?{try?{Integer?consumedElement?=?queue.take();}?catch?(InterruptedException?ex)?{ex.printStackTrace();} };

    其實?Executors.newCachedThreadPool()?方法里面使用的就是 SynchronousQueue

    public?static?ExecutorService?newCachedThreadPool()?{return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,60L,?TimeUnit.SECONDS,new?SynchronousQueue<Runnable>()); }

    看到前面?LinkedBlockingQueue?用在?newSingleThreadExecutor?和?newFixedThreadPool?上,而newCachedThreadPool?卻用?SynchronousQueue,這是為什么呢?

    因為單線程池和固定線程池中,線程數量是有限的,因此提交的任務需要在LinkedBlockingQueue隊列中等待空余的線程;

    而緩存線程池中,線程數量幾乎無限(上限為Integer.MAX_VALUE),因此提交的任務只需要在SynchronousQueue?隊列中同步移交給空余線程即可, 所以有時也會說?SynchronousQueue?的吞吐量要高于?LinkedBlockingQueue?和?ArrayBlockingQueue

    LinkedTransferQueue

    簡單來說,TransferQueue提供了一個場所,生產者線程使用?transfer?方法傳入一些對象并阻塞,直至這些對象被消費者線程全部取出。

    你有沒有覺得,剛剛介紹的?SynchronousQueue?是否很像一個容量為 0 的?TransferQueue。

    但 LinkedTransferQueue 相比其他阻塞隊列多了三個方法

    • transfer(E e)

      如果當前有消費者正在等待消費元素,transfer 方法就可以直接將生產者傳入的元素立刻 transfer (傳輸) 給消費者;如果沒有消費者等待消費元素,那么 transfer 方法會把元素放到隊列的 tail(尾部)

      節點,一直阻塞,直到該元素被消費者消費才返回

    • tryTransfer(E e)

      tryTransfer,很顯然是一種嘗試,如果沒有消費者等待消費元素,則馬上返回 false ,程序不會阻塞

    • tryTransfer(E e, long timeout, TimeUnit unit)

      帶有超時限制,嘗試將生產者傳入的元素 transfer 給消費者,如果超時時間到,還沒有消費者消費元素,則返回 false

    你瞧,所有阻塞的方法都是一個套路:

  • 阻塞方式

  • 帶有 try 的非阻塞方式

  • 帶有 try 和超時時間的非阻塞方式

  • 看到這你也許感覺 LinkedTransferQueue 沒啥特點,其實它和其他阻塞隊列的差別還挺大的:

    BlockingQueue 是如果隊列滿了,線程才會阻塞;但是 TransferQueue 是如果沒有消費元素,則會阻塞 (transfer 方法)

    這也就應了 Doug Lea 說的那句話:

    LinkedTransferQueue?is actually a superset of?ConcurrentLinkedQueue, ?SynchronousQueue?(in “fair” mode), and unboundedLinkedBlockingQueues. And it’s made better by allowing you to mix and match those features as well as take advantage of higher-performance i mplementation techniques.

    簡單翻譯:

    LinkedTransferQueue?是ConcurrentLinkedQueue,?SynchronousQueue?(在公平模式下), 無界的LinkedBlockingQueues等的超集; 允許你混合使用阻塞隊列的多種特性

    所以,在合適的場景中,請盡量使用LinkedTransferQueue

    上面都看的是單向隊列 FIFO,接下來我們看看雙向隊列

    LinkedBlockingDeque

    LinkedBlockingDeque?是一個由鏈表結構組成的雙向阻塞隊列,凡是后綴為 Deque 的都是雙向隊列意思,后綴的發音為deck——/dek/, ?剛接觸它時我以為是這個冰激凌的發音

    所謂雙向隊列值得就是可以從隊列的兩端插入和移除元素。所以:

    雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊是,也就會減少一半的競爭

    隊列有頭,有尾,因此它又比其他阻塞隊列多了幾個特殊的方法

    • addFirst

    • addLast

    • xxxxFirst

    • xxxxLast

    • ... ...

    這么一看,雙向阻塞隊列確實很高效,

    那雙向阻塞隊列應用在什么地方了呢?

    不知道你是否聽過 “工作竊取”模式,看似不太厚道的一種方法,實則是高效利用線程的好辦法。下一篇文章,我們就來看看 ForkJoinPool 是如何應用 ?“工作竊取”模式的

    總結

    到這關于 Java 隊列(其實主要介紹了阻塞隊列)就快速的區分完了,將看似雜亂的方法做了分類整理,方便快速理解其用途,同時也說明了這些隊列的實際用途。相信你帶著更高的視角來閱讀源碼會更加輕松,最后也希望大家認真看兩個隊列的源碼實現,在遇到隊列的問題,腦海中的畫面分分鐘就可以搞定了

    ?

    參考

  • Java 并發編程的藝術

  • Java 并發編程之美

  • https://zhuanlan.zhihu.com/p/27148381

  • 總結

    以上是生活随笔為你收集整理的10分钟搞定 Java 并发队列好吗?好的的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。