java阻塞队列小结
【README】
1,本文介紹了java的7個(gè)阻塞隊(duì)列;
2,阻塞隊(duì)列的作用
- 做緩沖作用,如緩沖kafka消息,而不是直接發(fā)送給kafka,減少kafka集群的壓力;
【1】阻塞隊(duì)列 BlockingQueue 概述
1,隊(duì)列是一種數(shù)據(jù)結(jié)構(gòu),先進(jìn)先出;
2,阻塞隊(duì)列的意思是:
- 當(dāng)阻塞隊(duì)列為空時(shí),線程1從隊(duì)列取出元素會(huì)阻塞;直到線程2向隊(duì)列添加了新值;
- 但阻塞隊(duì)列滿時(shí),線程1向隊(duì)列添加元素會(huì)阻塞;直到線程2從隊(duì)列取走了值(頭部元素);
3,阻塞隊(duì)列封裝了 線程阻塞,線程喚醒的底層方法,不需要程序員編寫這些代碼,減少了開發(fā)量和代碼簡潔;
【2】阻塞隊(duì)列介紹
【2.1】ArrayBlockingQueue 數(shù)組阻塞隊(duì)列
0)類描述:
底層使用可重入鎖 ReentrantLock 實(shí)現(xiàn),每個(gè)操作前都會(huì)加鎖,操作完成后解鎖;所以僅允許一個(gè)線程串行操作阻塞隊(duì)列中的任意一個(gè)方法;(線程1操作a方法,線程1退出前,線程2不能操作b方法,注意是b方法)
基于數(shù)組的有界阻塞隊(duì)列;
在創(chuàng)建該隊(duì)列時(shí),給定隊(duì)列容量 capacity,一旦創(chuàng)建,capacity無法修改;
此類支持 排序等待生產(chǎn)者和消費(fèi)者線程的可選公平策略。默認(rèn)情況下,不保證此順序。但是,在公平性設(shè)置為 true 的情況下構(gòu)造的隊(duì)列以 FIFO 順序授予線程訪問權(quán)限。公平通常會(huì)降低吞吐量,但會(huì)減少可變性并避免饑餓。
此類及其迭代器實(shí)現(xiàn)了 Collection 和 Iterator 接口的所有可選方法。
1)類定義
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {2)構(gòu)造器
public ArrayBlockingQueue(int capacity) {this(capacity, false); } // 帶有公平性 public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException(); // 基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列 this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition(); } // 給定初始隊(duì)列元素 public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {for (E e : c) {checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();} }3)元素操作方法
4)其他方法
【2.2】LinkedBlockingQueue 鏈表阻塞隊(duì)列
0)類描述
基于鏈表的可選有界隊(duì)列, 可選的意思是,給定容量則有界,否則無界;
基于鏈接節(jié)點(diǎn)的可選有界阻塞隊(duì)列(基于鏈表的可選有界隊(duì)列)。 此隊(duì)列對元素 FIFO(先進(jìn)先出)進(jìn)行排序。 隊(duì)列的頭部是在隊(duì)列中停留時(shí)間最長的那個(gè)元素。 隊(duì)列的尾部是在隊(duì)列中停留時(shí)間最短的那個(gè)元素。 新元素插入隊(duì)列尾部,隊(duì)列檢索操作獲取隊(duì)列頭部元素。
鏈接隊(duì)列通常比基于數(shù)組的隊(duì)列具有更高的吞吐量,但在大多數(shù)并發(fā)應(yīng)用程序中的可預(yù)測性較差。
可選的容量綁定構(gòu)造函數(shù)參數(shù)是一種 防止隊(duì)列過度擴(kuò)展的方法。 如果未指定,容量等于 Integer.MAX_VALUE。 鏈接節(jié)點(diǎn)在每次插入時(shí)動(dòng)態(tài)創(chuàng)建,除非這會(huì)使隊(duì)列超過容量。
此類及其迭代器實(shí)現(xiàn)了 Collection 和 Iterator 接口的所有可選方法。
此類是 Java 集合框架的成員。
1)類定義
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {2)構(gòu)造器
2.1)鏈表節(jié)點(diǎn)-靜態(tài)內(nèi)部類
static class Node<E> {E item;/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*/Node<E> next;Node(E x) { item = x; } } // 兩把鎖 /** 元素獲取鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 元素非空條件 */ private final Condition notEmpty = takeLock.newCondition(); /** 元素插入鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 元素非滿條件 */ private final Condition notFull = putLock.newCondition();2.2)構(gòu)造器
// 構(gòu)造器 public LinkedBlockingQueue() {this(Integer.MAX_VALUE); }// 給定隊(duì)列大小 public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null); // 頭節(jié)點(diǎn) }// 創(chuàng)建大小為 Integer.Max_value 的,用給定容器初始化元素的 隊(duì)列 public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();} }3)元素操作方法
同 ArrayBlockingQueue的元素操作方法,包括
【2.3】PriorityBlockingQueue 優(yōu)先級(jí)阻塞隊(duì)列 (底層使用數(shù)組)
0)類描述
它是一個(gè)基于數(shù)組的無界阻塞隊(duì)列,自動(dòng)擴(kuò)容保證無界;插入元素用不阻塞,獲取元素阻塞(干貨);
一個(gè)無界阻塞隊(duì)列,它使用與 PriorityQueue 類相同的排序規(guī)則并提供阻塞檢索操作。
雖然這個(gè)隊(duì)列在邏輯上是無界的,但由于資源耗盡(導(dǎo)致 OutOfMemoryError),嘗試添加可能會(huì)失敗。此類不允許空元素。依賴于自然排序的優(yōu)先級(jí)隊(duì)列也不允許插入不可比較的對象(這樣做會(huì)導(dǎo)致 ClassCastException)。
此類及其迭代器實(shí)現(xiàn)了 Collection 和 Iterator 接口的所有可選方法。方法 iterator() 中提供的 Iterator 不能保證以任何特定順序遍歷 PriorityBlockingQueue 的元素。如果您需要有序遍歷,請考慮使用 Arrays.sort(pq.toArray())。此外,drainTo 方法可用于按優(yōu)先級(jí)順序刪除部分或全部元素,并將它們放置在另一個(gè)集合中。
對此類的操作不保證具有相同優(yōu)先級(jí)的元素的順序。如果您需要強(qiáng)制排序,您可以定義自定義類或比較器,它們使用輔助鍵來打破主要優(yōu)先級(jí)值之間的聯(lián)系。例如,這是一個(gè)將先進(jìn)先出打破平局應(yīng)用于可比元素的類。要使用它,您需要插入一個(gè)新的 FIFOEntry(anEntry) 而不是普通的條目對象。
1)類定義
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {2)構(gòu)造器
// 創(chuàng)建優(yōu)先級(jí)阻塞度列,默認(rèn)大小 Integer.max_value, 比較器為null,默認(rèn)為字典序 public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null); } // private static final int DEFAULT_INITIAL_CAPACITY = 11; 默認(rèn)大小11 // 創(chuàng)建優(yōu)先級(jí)阻塞度列,給定大小 , 比較器為null,默認(rèn)為字典序 public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null); }// 創(chuàng)建優(yōu)先級(jí)阻塞度列,給定大小 ,和比較器 public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {if (initialCapacity < 1)throw new IllegalArgumentException();this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity]; } // 創(chuàng)建優(yōu)先級(jí)阻塞隊(duì)列,包含給定的集合c;若集合是 SortedSet 或 PriorityQueue,則創(chuàng)建后的隊(duì)列的元素順序不變,否則采用字典序 public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true; // true if must screen for nullsif (c instanceof SortedSet<?>) {SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;}else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify(); }3)元素操作方法
【2.4】DelayQueue - 延遲隊(duì)列
0)類描述
Delayed元素的無界阻塞隊(duì)列,每個(gè)元素只能在其延遲到期時(shí)被取用;
其底層使用了優(yōu)先級(jí)隊(duì)列來實(shí)現(xiàn);
Delayed 元素的無界阻塞隊(duì)列,其中每個(gè)元素只能在其延遲到期時(shí)被取用。
隊(duì)列的頭部是過去延遲過期最遠(yuǎn)的那個(gè) Delayed 元素。 如果沒有延遲到期,則沒有 頭部且 poll() 方法返回 null。
當(dāng)元素的 getDelay(TimeUnit.NANOOSECONDS) 方法返回值小于或等于零的值時(shí),就會(huì)發(fā)生過期。
盡管無法使用 take 或 poll()方法 取走(刪除)未過期的元素,但它們?nèi)员灰暈槠胀ㄔ亍?例如,size 方法返回過期和未過期元素的總個(gè)數(shù)。 此隊(duì)列不允許空元素。
此類及其迭代器實(shí)現(xiàn)了 Collection 和 Iterator 接口的所有可選方法。
方法 iterator() 中提供的 Iterator 不保證以任何特定順序遍歷 DelayQueue 的元素。
1)類定義
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {// 延遲隊(duì)列底層使用 優(yōu)先級(jí)隊(duì)列 private final PriorityQueue<E> q = new PriorityQueue<E>();....... }- 1.1)Delayed 接口定義?
一個(gè)混合風(fēng)格的接口,用于標(biāo)記在給定延遲后應(yīng)采取行動(dòng)的對象。
此接口的實(shí)現(xiàn)必須定義一個(gè) compareTo 方法,以提供與其 getDelay 方法一致的排序。
- getDelay() 方法描述:
返回關(guān)聯(lián)對象剩余的延遲時(shí)間;單位為unit;
2)構(gòu)造器
// 創(chuàng)建一個(gè)空的延遲隊(duì)列 public DelayQueue() {}// 創(chuàng)建延遲隊(duì)列,初始包含給定集合c public DelayQueue(Collection<? extends E> c) {this.addAll(c);}3)元素操作方法
【2.5】SynchronousQueue 同步隊(duì)列
0)類描述
一個(gè)阻塞隊(duì)列,其中每個(gè)插入操作都必須等待另一個(gè)線程執(zhí)行相應(yīng)的移除操作,反之亦然。同步隊(duì)列沒有任何內(nèi)部容量。
- 您無法查看同步隊(duì)列,因?yàn)樵貎H在您嘗試刪除它時(shí)才存在;
- 你不能插入一個(gè)元素(使用任何方法),除非另一個(gè)線程試圖刪除它;
- 你不能迭代,因?yàn)闆]有什么可以迭代的。
隊(duì)列頭部是第一個(gè)排隊(duì)的插入線程試圖添加到隊(duì)列的元素;
如果沒有這樣的排隊(duì)線程,則沒有元素可用于刪除,poll() 將返回 null。對于其他集合方法(例如包含),SynchronousQueue 充當(dāng)空集合。此隊(duì)列不允許空元素。
同步隊(duì)列類似于 CSP 和 Ada 中使用的集合通道。
它們非常適合切換設(shè)計(jì),在這種設(shè)計(jì)中,在一個(gè)線程中運(yùn)行的對象必須與在另一個(gè)線程中運(yùn)行的對象同步,以便將某些信息、事件或任務(wù)交給它。
該類支持可選的公平策略,用于對等待的生產(chǎn)者線程和消費(fèi)者線程進(jìn)行排序。默認(rèn)情況下,不保證此順序。但是,公平性設(shè)置為 true 的隊(duì)列會(huì)以 FIFO 順序授予線程訪問權(quán)限(公平性,誰最先等待,誰最先操作)。
此類及其迭代器實(shí)現(xiàn)了 Collection 和 Iterator 接口的所有可選方法。
1)類定義
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {2)構(gòu)造器
public SynchronousQueue() {this(false); }// 若公平性fair設(shè)置為true,則等待線程根據(jù) FIFO 順序競爭訪問 public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }3)方法描述
【2.6】LinkedTransferQueue 鏈表傳輸隊(duì)列
0)類描述
基于鏈接節(jié)點(diǎn)的無界 TransferQueue。
該隊(duì)列根據(jù)任何給定的生產(chǎn)者對元素 FIFO(先進(jìn)先出)進(jìn)行排序。隊(duì)列的頭部是某個(gè)生產(chǎn)者在隊(duì)列中停留時(shí)間最長的元素。隊(duì)列的尾部是某個(gè)生產(chǎn)者在隊(duì)列中停留時(shí)間最短的那個(gè)元素。
請注意,與大多數(shù)集合不同,size 方法不是恒定時(shí)間操作。
?由于這些隊(duì)列的異步特性,確定當(dāng)前元素的數(shù)量需要遍歷所有元素,因此如果在遍歷期間修改此集合,則可能會(huì)報(bào)告不準(zhǔn)確的結(jié)果。
此外,批量操作 addAll、removeAll、retainAll、containsAll、equals 和 toArray 不能保證以原子方式執(zhí)行。例如,與 addAll 操作同時(shí)運(yùn)行的迭代器可能只查看一些添加的元素。
此類及其迭代器實(shí)現(xiàn)了 Collection 和 Iterator 接口的所有可選方法。
與其他并發(fā)集合一樣,線程中的操作在將對象放入LinkedTransferQueue之前發(fā)生——在另一個(gè)線程中的LinkedTransferQueue中訪問或刪除該元素之后發(fā)生操作。
1)類定義
public class LinkedTransferQueue<E> extends AbstractQueue<E>implements TransferQueue<E>, java.io.Serializable {2)構(gòu)造器
// 創(chuàng)建空鏈表傳輸隊(duì)列 public LinkedTransferQueue() { }// 創(chuàng)建包含給定集合c的鏈表傳輸隊(duì)列 public LinkedTransferQueue(Collection<? extends E> c) {this();addAll(c); }3)元素操作方法
/** Possible values for "how" argument in xfer method.*/ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer【2.7】LinkedBlockingDeque 鏈表阻塞雙端隊(duì)列(注意是雙端)
0)類描述
基于鏈表的可選有界阻塞雙端隊(duì)列。
可選的容量提供了防止隊(duì)列過度擴(kuò)展的方法。
如果未指定,容量等于 Integer.MAX_VALUE(默認(rèn))。 鏈接節(jié)點(diǎn)在每次插入時(shí)動(dòng)態(tài)創(chuàng)建,除非這會(huì)使雙端隊(duì)列超出容量。
大多數(shù)操作在恒定時(shí)間內(nèi)運(yùn)行(忽略阻塞所花費(fèi)的時(shí)間)。 但 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 和批量操作,所有這些都在線性時(shí)間內(nèi)運(yùn)行。
此類及其迭代器實(shí)現(xiàn)了 Collection 和 Iterator 接口的所有可選方法。
1)類定義
public class LinkedBlockingDeque<E>extends AbstractQueue<E>implements BlockingDeque<E>, java.io.Serializable {2)構(gòu)造器
// 默認(rèn)容量的構(gòu)造器 public LinkedBlockingDeque() {this(Integer.MAX_VALUE);}// 給定容量的構(gòu)造器 public LinkedBlockingDeque(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;} // 創(chuàng)建一個(gè)包含 指定容器中所有元素 的雙端隊(duì)列 public LinkedBlockingDeque(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock lock = this.lock;lock.lock(); // Never contended, but necessary for visibilitytry {for (E e : c) {if (e == null)throw new NullPointerException();if (!linkLast(new Node<E>(e)))throw new IllegalStateException("Deque full");}} finally {lock.unlock();}}底層使用了 Node節(jié)點(diǎn),包括 上一個(gè) 下一個(gè)指針;
static final class Node<E> { E item;// 節(jié)點(diǎn)值 // 上一個(gè)節(jié)點(diǎn)Node<E> prev;// 下一個(gè)節(jié)點(diǎn) Node<E> next;// 節(jié)點(diǎn)構(gòu)造器 Node(E x) {item = x;}}3)元素操作方法
總結(jié)
以上是生活随笔為你收集整理的java阻塞队列小结的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ps的加深工具快捷键(ps加深简单工具)
- 下一篇: 用枚举enum实现单例