日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

终于,我读懂了所有Java集合——queue篇

發(fā)布時間:2023/12/13 java 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 终于,我读懂了所有Java集合——queue篇 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Stack

基于Vector實現(xiàn),支持LIFO。

類聲明

public class Stack<E> extends Vector<E> {}

push

public E push(E item) {addElement(item);return item; }

pop

public synchronized E pop() {E? obj;int len = size();obj = peek();removeElementAt(len - 1);return obj; }

peek

public synchronized E peek() {int len = size();if (len == 0)throw new EmptyStackException();return elementAt(len - 1); }

Queue

先進(jìn)先出”(FIFO—first in first out)的線性表

LinkedList類實現(xiàn)了Queue接口,因此我們可以把LinkedList當(dāng)成Queue來用。

Java里有一個叫做Stack的類,卻沒有叫做Queue的類(它是個接口名字)。當(dāng)需要使用棧時,Java已不推薦使用Stack,而是推薦使用更高效的ArrayDeque;既然Queue只是一個接口,當(dāng)需要使用隊列時也就首選ArrayDeque了(次選是LinkedList)。

ArrayDeque和LinkedList是Deque的兩個通用實現(xiàn)。

ArrayDeque

(底層是循環(huán)數(shù)組,有界隊列)

head指向首端第一個有效元素,tail指向尾端第一個可以插入元素的空位。因為是循環(huán)數(shù)組,所以head不一定總等于0,tail也不一定總是比head大。

ConcurrentLinkedQueue

(底層是鏈表,基于CAS的非阻塞隊列,無界隊列)

ConcurrentLinkedQueue是一個基于鏈接節(jié)點(diǎn)的無界線程安全隊列,它采用先進(jìn)先出的規(guī)則對節(jié)點(diǎn)進(jìn)行排序,當(dāng)我們添加一個元素的時候,它會添加到隊列的尾部,當(dāng)我們獲取一個元素時,它會返回隊列頭部的元素。它采用了“wait-free”算法(非阻塞)來實現(xiàn)。

1 . 使用 CAS 原子指令來處理對數(shù)據(jù)的并發(fā)訪問,這是非阻塞算法得以實現(xiàn)的基礎(chǔ)。

2. head/tail 并非總是指向隊列的頭 / 尾節(jié)點(diǎn),也就是說允許隊列處于不一致狀態(tài)。 這個特性把入隊 / 出隊時,原本需要一起原子化執(zhí)行的兩個步驟分離開來,從而縮小了入隊 / 出隊時需要原子化更新值的范圍到唯一變量。這是非阻塞算法得以實現(xiàn)的關(guān)鍵。

3. 以批處理方式來更新head/tail,從整體上減少入隊 / 出隊操作的開銷。

4. ConcurrentLinkedQueue的迭代器是弱一致性的,這在并發(fā)容器中是比較普遍的現(xiàn)象,主要是指在一個線程在遍歷隊列結(jié)點(diǎn)而另一個線程嘗試對某個隊列結(jié)點(diǎn)進(jìn)行修改的話不會拋出ConcurrentModificationException,這也就造成在遍歷某個尚未被修改的結(jié)點(diǎn)時,在next方法返回時可以看到該結(jié)點(diǎn)的修改,但在遍歷后再對該結(jié)點(diǎn)修改時就看不到這種變化。

1. 在入隊時最后一個結(jié)點(diǎn)中的next域為null

2. 隊列中的所有未刪除結(jié)點(diǎn)的item域不能為null且從head都可以在O(N)時間內(nèi)遍歷到

3. 對于要刪除的結(jié)點(diǎn),不是將其引用直接置為空,而是將其的item域先置為null(迭代器在遍歷是會跳過item為null的結(jié)點(diǎn))

4. 允許head和tail滯后更新,也就是上文提到的head/tail并非總是指向隊列的頭 / 尾節(jié)點(diǎn)(這主要是為了減少CAS指令執(zhí)行的次數(shù),但同時會增加volatile讀的次數(shù),但是這種消耗較小)。具體而言就是,當(dāng)在隊列中插入一個元素是,會檢測tail和最后一個結(jié)點(diǎn)之間的距離是否在兩個結(jié)點(diǎn)及以上(內(nèi)部稱之為hop);而在出隊時,對head的檢測就是與隊列的第一個結(jié)點(diǎn)的距離是否達(dá)到兩個,有則將head指向第一個結(jié)點(diǎn)并將head原來指向的結(jié)點(diǎn)的next域指向自己,這樣就能斷開與隊列的聯(lián)系從而幫助GC

head節(jié)點(diǎn)并不是總指向第一個結(jié)點(diǎn),tail也并不是總指向最后一個節(jié)點(diǎn)。

?

成員變量

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

?

構(gòu)造方法

public ConcurrentLinkedQueue() {head = tail = new Node<E>(null); }

Node#CAS操作

在obj的offset位置比較object field和期望的值,如果相同則更新。這個方法的操作應(yīng)該是原子的,因此提供了一種不可中斷的方式更新object field。

如果node的next值為cmp,則將其更新為val

boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); }private boolean casHead(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); }void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val); }

offer(無鎖)

/*** Inserts the specified element at the tail of this queue.* As the queue is unbounded, this method will never return {@code false}.** @return {@code true} (as specified by {@link Queue#offer})* @throws NullPointerException if the specified element is null*/ public boolean offer(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// q/p.next/tail.next為null,則說明p是尾節(jié)點(diǎn),則插入if (q == null) {// CAS插入 p.next = newNode,多線程環(huán)境下只有一個線程可以設(shè)置成功// 此時 tail.next = newNodeif (p.casNext(null, newNode)) {// CAS成功說明新節(jié)點(diǎn)已經(jīng)放入鏈表// 如果p不為t,說明當(dāng)前線程是之前CAS失敗后又重試CAS成功的,tail = newNodeif (p != t) // hop two nodes at a timecasTail(t, newNode);? // Failure is OK.return true;}// Lost CAS race to another thread; re-read next}else if (p == q)//多線程操作時候,由于poll時候會把老的head變?yōu)樽砸?#xff0c;然后head的next變?yōu)樾耯ead,所以這里需要重新找新的head,因為新的head后面的節(jié)點(diǎn)才是激活的節(jié)點(diǎn)// p = head , t = tailp = (t != (t = tail)) ? t : head;else// 對上一次CAS失敗的線程而言,t.next/p.next/tail.next/q 不是null了// 副作用是p = q,p和q都指向了尾節(jié)點(diǎn),進(jìn)入第三次循環(huán)p = (p != t && t != (t = tail)) ? t : q;} }

?

poll(無鎖)

public E poll() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {// 保存當(dāng)前節(jié)點(diǎn)的值E item = p.item;// 當(dāng)前節(jié)點(diǎn)有值則CAS置為null, p.item = nullif (item != null && p.casItem(item, null)) {// CAS成功代表當(dāng)前節(jié)點(diǎn)已經(jīng)從鏈表中移除if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;} // 當(dāng)前隊列為空時則返回nullelse if ((q = p.next) == null) {updateHead(h, p);return null;} // 自引用了,則重新找新的隊列頭節(jié)點(diǎn)else if (p == q)continue restartFromHead;elsep = q;}} }final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p))h.lazySetNext(h); }

peek(無鎖)

public E peek() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;if (item != null || (q = p.next) == null) {updateHead(h, p);return item;}else if (p == q)continue restartFromHead;elsep = q;}} }

size(遍歷計算大小,效率低)

public int size() {int count = 0;for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null)// Collection.size() spec says to max outif (++count == Integer.MAX_VALUE)break;return count; }

PriorityQueue

(底層是數(shù)組,邏輯上是小頂堆,無界隊列)

PriorityQueue底層實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)是“堆”,堆具有以下兩個性質(zhì):

任意一個節(jié)點(diǎn)的值總是不大于(最大堆)或者不小于(最小堆)其父節(jié)點(diǎn)的值;堆是一棵完全二叉樹

基于數(shù)組實現(xiàn)的二叉堆,對于數(shù)組中任意位置的n上元素,其左孩子在[2n+1]位置上,右孩子[2(n+1)]位置,它的父親則在[(n-1)/2]上,而根的位置則是[0]。

1)時間復(fù)雜度:remove()方法和add()方法時間復(fù)雜度為O(logn),remove(Object obj)和contains()方法需要O(n)時間復(fù)雜度,取隊頭則需要O(1)時間

2)在初始化階段會執(zhí)行建堆函數(shù),最終建立的是最小堆,每次出隊和入隊操作不能保證隊列元素的有序性,只能保證隊頭元素和新插入元素的有序性,如果需要有序輸出隊列中的元素,則只要調(diào)用Arrays.sort()方法即可

3)可以使用Iterator的迭代器方法輸出隊列中元素

4)PriorityQueue是非同步的,要實現(xiàn)同步需要調(diào)用java.util.concurrent包下的PriorityBlockingQueue類來實現(xiàn)同步

5)在隊列中不允許使用null元素

6)PriorityQueue默認(rèn)是一個小頂堆,然而可以通過傳入自定義的Comparator函數(shù)來實現(xiàn)大頂堆

替代:用TreeMap復(fù)雜度太高,有沒有更好的方法。hash方法,但是隊列不是定長的,如果改變了大小要rehash代價太大,還有什么方法?用堆實現(xiàn),那每次get put復(fù)雜度是多少(lgN)

BlockingQueue

對于許多多線程問題,都可以通過使用一個或多個隊列以優(yōu)雅的方式將其形式化

生產(chǎn)者線程向隊列插入元素,消費(fèi)者線程則取出它們。使用隊列,可以安全地從一個線程向另一個線程傳遞數(shù)據(jù)。

比如轉(zhuǎn)賬

一個線程將轉(zhuǎn)賬指令放入隊列

一個線程從隊列中取出指令執(zhí)行轉(zhuǎn)賬,只有這個線程可以訪問銀行對象的內(nèi)部。因此不需要同步

?

當(dāng)試圖向隊列中添加元素而隊列已滿,或是想從隊列移出元素而隊列為空的時候,阻塞隊列導(dǎo)致線程阻塞

在協(xié)調(diào)多個線程之間的合作時,阻塞隊列是很有用的。

工作者線程可以周期性地將中間結(jié)果放入阻塞隊列,其他工作者線程取出中間結(jié)果并進(jìn)一步修改。隊列會自動平衡負(fù)載,大概第一個線程集比第二個運(yùn)行的慢,那么第二個線程集在等待結(jié)果時會阻塞,反之亦然

1LinkedBlockingQueue的容量是沒有上邊界的,是一個雙向隊列

2ArrayBlockingQueue在構(gòu)造時需要指定容量,并且有一個參數(shù)來指定是否需要公平策略

3PriorityBlockingQueue是一個帶優(yōu)先級的隊列,元素按照它們的優(yōu)先級順序被移走。該隊列沒有容量上限。

4DelayQueue包含實現(xiàn)了Delayed接口的對象

5TransferQueue接口允許生產(chǎn)者線程等待,直到消費(fèi)者準(zhǔn)備就緒可以接收一個元素。如果生產(chǎn)者調(diào)用transfer方法,那么這個調(diào)用會阻塞,直到插入的元素被消費(fèi)者取出之后才停止阻塞。

LinkedTransferQueue類實現(xiàn)了這個接口

ArrayBlockingQueue(底層是數(shù)組,阻塞隊列,一把鎖兩個Condition,有界同步隊列)

基于數(shù)組、先進(jìn)先出、線程安全的集合類,特點(diǎn)是可實現(xiàn)指定時間的阻塞讀寫,并且容量是可限制的。

成員變量

/** The queued items */ final Object[] items;/** items index for next take, poll, peek or remove */ int takeIndex;/** items index for next put, offer, or add */ int putIndex;/** Number of elements in the queue */ int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** Main lock guarding all access */ final ReentrantLock lock;/** Condition for waiting takes */ private final Condition notEmpty;/** Condition for waiting puts */ private final Condition notFull;/*** Shared state for currently active iterators, or null if there* are known not to be any.? Allows queue operations to update* iterator state.*/ transient Itrs itrs = null;

put(有鎖,隊列滿則阻塞)

public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();} }private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal(); }

take(有鎖,隊列空則阻塞)

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();} }private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x; }

offer(有鎖,最多阻塞一段時間)

public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();} }

?

poll(有鎖,最多阻塞一段時間)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();} }

peek(有鎖)

public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}final E itemAt(int i) {return (E) items[i]; }

遍歷(構(gòu)造迭代器加鎖,遍歷迭代器也加鎖)

LinkedBlockingQueue

(底層是鏈表,阻塞隊列,兩把鎖,各自對應(yīng)一個Condition,無界同步隊列)

另一種BlockingQueue的實現(xiàn),基于鏈表,沒有容量限制。

由于出隊只操作隊頭,入隊只操作隊尾,這里巧妙地使用了兩把鎖,對于put和offer入隊操作使用一把鎖,對于take和poll出隊操作使用一把鎖,避免了出隊、入隊時互相競爭鎖的現(xiàn)象,因此LinkedBlockingQueue在高并發(fā)讀寫都多的情況下,性能會較ArrayBlockingQueue好很多,在遍歷以及刪除的情況下則要兩把鎖都要鎖住。

多CPU情況下可以在同一時刻既消費(fèi)又生產(chǎn)。

LinkedBlockingDeque

(底層是雙向鏈表,阻塞隊列,一把鎖兩個Condition,無界同步隊列)

LinkedBlockingDeque是一個基于鏈表的雙端阻塞隊列。和LinkedBlockingQueue類似,區(qū)別在于該類實現(xiàn)了Deque接口,而LinkedBlockingQueue實現(xiàn)了Queue接口。

LinkedBlockingDeque內(nèi)部只有一把鎖以及該鎖上關(guān)聯(lián)的兩個條件,所以可以推斷同一時刻只有一個線程可以在隊頭或者隊尾執(zhí)行入隊或出隊操作(類似于ArrayBlockingQueue)??梢园l(fā)現(xiàn)這點(diǎn)和LinkedBlockingQueue不同,LinkedBlockingQueue可以同時有兩個線程在兩端執(zhí)行操作。

LinkedBlockingDeque和LinkedBlockingQueue的相同點(diǎn)在于:

1. 基于鏈表

2. 容量可選,不設(shè)置的話,就是Int的最大值

和LinkedBlockingQueue的不同點(diǎn)在于:

1. 雙端鏈表和單鏈表

2. 不存在哨兵節(jié)點(diǎn)

3. 一把鎖+兩個條件

LinkedBlockingDeque和ArrayBlockingQueue的相同點(diǎn)在于:使用一把鎖+兩個條件維持隊列的同步。

PriorityBlockingQueue

(底層是數(shù)組,出隊時隊空則阻塞;無界隊列,不存在隊滿情況,一把鎖一個Condition)

支持優(yōu)先級的無界阻塞隊列。默認(rèn)情況下元素采用自然順序升序排序,當(dāng)然我們也可以通過構(gòu)造函數(shù)來指定Comparator來對元素進(jìn)行排序。需要注意的是PriorityBlockingQueue不能保證同優(yōu)先級元素的順序。

擴(kuò)容

(基于CAS+Lock,CAS控制創(chuàng)建新的數(shù)組原子執(zhí)行,Lock控制數(shù)組替換原子執(zhí)行)

private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);} }

?

DelayQueue

(底層是PriorityQueue,無界阻塞隊列,過期元素方可移除,基于Lock)

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();

DelayQueue隊列中每個元素都有個過期時間,并且隊列是個優(yōu)先級隊列,當(dāng)從隊列獲取元素時候,只有過期元素才會出隊列。

每個元素都必須實現(xiàn)Delayed接口

public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit); }

getDelay方法返回對象的殘留延遲,負(fù)值表示延遲結(jié)束

元素只有在延遲用完的時候才能從DelayQueue移出。還必須實現(xiàn)Comparable接口。

一個典型場景是重試機(jī)制的實現(xiàn),比如當(dāng)調(diào)用接口失敗后,把當(dāng)前調(diào)用信息放入delay=10s的元素,然后把元素放入隊列,那么這個隊列就是一個重試隊列,一個線程通過take方法獲取需要重試的接口,take返回則接口進(jìn)行重試,失敗則再次放入隊列,同時也可以在元素加上重試次數(shù)。

成員變量

private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>();private Thread leader = null;private final Condition available = lock.newCondition();

構(gòu)造方法

public DelayQueue() {}

put

public void put(E e) {offer(e);} public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;// 通知最先等待的線程available.signal();}return true;} finally {lock.unlock();} }

take

獲取并移除隊列首元素,如果隊列沒有過期元素則等待。

第一次調(diào)用take時候由于隊列空,所以調(diào)用(2)把當(dāng)前線程放入available的條件隊列等待,當(dāng)執(zhí)行offer并且添加的元素就是隊首元素時候就會通知最先等待的線程激活,循環(huán)重新獲取隊首元素,這時候first假如不空,則調(diào)用getdelay方法看該元素海剩下多少時間就過期了,如果delay<=0則說明已經(jīng)過期,則直接出隊返回。否則看leader是否為null,不為null則說明是其他線程也在執(zhí)行take則把該線程放入條件隊列,否則是當(dāng)前線程執(zhí)行的take方法,則調(diào)用(5) await直到剩余過期時間到(這期間該線程會釋放鎖,所以其他線程可以offer添加元素,也可以take阻塞自己),剩余過期時間到后,該線程會重新競爭得到鎖,重新進(jìn)入循環(huán)。

(6)說明當(dāng)前take返回了元素,如果當(dāng)前隊列還有元素則調(diào)用singal激活條件隊列里面可能有的等待線程。leader那么為null,那么是第一次調(diào)用take獲取過期元素的線程,第一次調(diào)用的線程調(diào)用設(shè)置等待時間的await方法等待數(shù)據(jù)過期,后面調(diào)用take的線程則調(diào)用await直到signal。

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) { // 1)獲取但不移除隊首元素E first = q.peek();if (first == null) // 2)無元素,則阻塞 available.await();else {long delay = first.getDelay(NANOSECONDS); // 3)有元素,且已經(jīng)過期,則移除if (delay <= 0)return q.poll();first = null; // don't retain ref while waiting // 4)if (leader != null)available.await();else {Thread thisThread = Thread.currentThread(); // 5)leader = thisThread;try { // 繼續(xù)阻塞延遲的時間available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();} }

SynchronousQueue

(只存儲一個元素,阻塞隊列,基于CAS)

實現(xiàn)了BlockingQueue,是一個阻塞隊列。

一個只存儲一個元素的的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入一直處于阻塞狀態(tài),吞吐量高于LinkedBlockingQueue。

SynchronousQueue內(nèi)部并沒有數(shù)據(jù)緩存空間,你不能調(diào)用peek()方法來看隊列中是否有數(shù)據(jù)元素,因為數(shù)據(jù)元素只有當(dāng)你試著取走的時候才可能存在,不取走而只想偷窺一下是不行的,當(dāng)然遍歷這個隊列的操作也是不允許的。隊列頭元素是第一個排隊要插入數(shù)據(jù)的線程,而不是要交換的數(shù)據(jù)。數(shù)據(jù)是在配對的生產(chǎn)者和消費(fèi)者線程之間直接傳遞的,并不會將數(shù)據(jù)緩沖數(shù)據(jù)到隊列中??梢赃@樣來理解:生產(chǎn)者和消費(fèi)者互相等待對方,握手,然后一起離開。

// 如果為 true,則等待線程以 FIFO 的順序競爭訪問;否則順序是未指定的。?// SynchronousQueue<Integer> sc =new SynchronousQueue<>(true);//fair -?SynchronousQueue<Integer> sc = new SynchronousQueue<>(); // 默認(rèn)不指定的話是false,不公平的?TransferQueue (特殊的BlockingQueue) 生產(chǎn)者會一直阻塞直到所添加到隊列的元素被某一個消費(fèi)者所消費(fèi)(不僅僅是添加到隊列里就完事)當(dāng)我們不想生產(chǎn)者過度生產(chǎn)消息時,TransferQueue可能非常有用,可避免發(fā)生OutOfMemory錯誤。在這樣的設(shè)計中,消費(fèi)者的消費(fèi)能力將決定生產(chǎn)者產(chǎn)生消息的速度。public interface TransferQueue<E> extends BlockingQueue<E> {/*** 立即轉(zhuǎn)交一個元素給消費(fèi)者,如果此時隊列沒有消費(fèi)者,那就false*/boolean tryTransfer(E e);/*** 轉(zhuǎn)交一個元素給消費(fèi)者,如果此時隊列沒有消費(fèi)者,那就阻塞*/void transfer(E e) throws InterruptedException;/*** 帶超時的tryTransfer*/boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException;/*** 是否有消費(fèi)者等待接收數(shù)據(jù),瞬時狀態(tài),不一定準(zhǔn)*/boolean hasWaitingConsumer();/*** 返回還有多少個等待的消費(fèi)者,跟上面那個一樣,都是一種瞬時狀態(tài),不一定準(zhǔn)*/int getWaitingConsumerCount();}

?

LinkedTransferQueue

(底層是鏈表,阻塞隊列,無界同步隊列)

LinkedTransferQueue實現(xiàn)了TransferQueue接口,這個接口繼承了BlockingQueue。之前BlockingQueue是隊列滿時再入隊會阻塞,而這個接口實現(xiàn)的功能是隊列不滿時也可以阻塞,實現(xiàn)一種有阻塞的入隊功能。

LinkedTransferQueue實際上是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。而且LinkedTransferQueue更好用,因為它不僅僅綜合了這幾個類的功能,同時也提供了更高效的實現(xiàn)。

Queue實現(xiàn)類之間的區(qū)別

非線程安全的:ArrayDeque、LinkedList、PriorityQueue

線程安全的:ConcurrentLinkedQueue、ConcurrentLinkedDeque、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue

線程安全的又分為阻塞隊列和非阻塞隊列,阻塞隊列提供了put、take等會阻塞當(dāng)前線程的方法,比如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue,也有offer、poll等阻塞一段時間候返回的方法;

非阻塞隊列是使用CAS機(jī)制保證offer、poll等可以線程安全地入隊出隊,并且不需要加鎖,不會阻塞當(dāng)前線程,比如ConcurrentLinkedQueue、ConcurrentLinkedDeque。

ArrayBlockingQueue和LinkedBlockingQueue 區(qū)別

1. 隊列中鎖的實現(xiàn)不同

??? ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的,即生產(chǎn)和消費(fèi)用的是同一個鎖;

??? LinkedBlockingQueue實現(xiàn)的隊列中的鎖是分離的,即生產(chǎn)用的是putLock,消費(fèi)是takeLock

2. 底層實現(xiàn)不同

前者基于數(shù)組,后者基于鏈表

3. 隊列邊界不同

??? ArrayBlockingQueue實現(xiàn)的隊列中必須指定隊列的大小,是有界隊列

??? LinkedBlockingQueue實現(xiàn)的隊列中可以不指定隊列的大小,但是默認(rèn)是Integer.MAX_VALUE,是無界隊列

?

總結(jié)

以上是生活随笔為你收集整理的终于,我读懂了所有Java集合——queue篇的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。