java并发编程之美-阅读记录7
java并發包中的并發隊列
7.1ConcurrentLinkedQueue
線程安全的無界非阻塞隊列(非阻塞隊列使用CAS非阻塞算法實現),其底層數組使用單向列表實現,對于出隊和入隊操作使用CAS非阻塞來實現線程安全的。
1、結構:
ConcurrentLinkedQueue內部的對列使用的是單向鏈表實現,并且有兩個用volatile修改的節點頭結點head和tail尾節點
private transient volatile Node<E> head;private transient volatile Node<E> tail;// 默認的無常構造使頭節點和尾節點都指向一個值為null的哨兵節點public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);}// 同時還提供了一個有參構造,將指定集合中的數據插入到鏈表中public ConcurrentLinkedQueue(Collection<? extends E> c) {Node<E> h = null, t = null;for (E e : c) {checkNotNull(e);Node<E> newNode = new Node<E>(e);if (h == null)h = t = newNode;else {t.lazySetNext(newNode);t = newNode;}}if (h == null)h = t = new Node<E>(null);head = h;tail = t;}// 內部類Node,使用Unsafe類來保證CAS操作的原子性private static class Node<E> {// 元素值volatile E item;// 下一個節點volatile Node<E> next;Node(E item) {UNSAFE.putObject(this, itemOffset, item);}boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}}
2、offer方法
offer操作是在隊列的尾部添加一個元素,如果提供的元素為null,則會拋出一個異常
public boolean offer(E e) {// 校驗提供的元素e,如果為null時則會拋出異常checkNotNull(e);// 使用原素e來構建一個新的節點final Node<E> newNode = new Node<E>(e);for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;if (q == null) {// p is last node q為null則表明p是最后一個節點 ,下一步使用cas操作,將新建的節點賦給pif (p.casNext(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this queue,// and for newNode to become "live".if (p != t) // hop two nodes at a time 設置成功后,重置尾節點casTail(t, newNode); // Failure is OK.return true;}// Lost CAS race to another thread; re-read next}else if (p == q)// 由于多線程操作,將head節點的next設置為自己,因此會出現p == q的情況// We have fallen off list. If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable. Else the new tail is a better bet.// 如果之前的尾節點不是當前的尾節點(最新的尾節點),則將t重設為最新的尾節點(因為tail是內存可見的,其他線程操作后,當前線程是可以看得見的),p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops. 循環找尾節點p = (p != t && t != (t = tail)) ? t : q;}}// add方法內部也是走的offer方法public boolean add(E e) {return offer(e);}3、poll操作
獲取在隊列頭部的節點,并移除,如果隊列為空,則返回null
public E poll() { // goto標記 和循環充的continue restartFromHead關聯,即重新走循環語句restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {// 獲取當前節點E item = p.item;// CAS操作將當前head節點設置為nullif (item != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.// 設置成功后(判斷是否成功,根據p和h,未修改之前時,p==h,修改成功后,p!=h),成功后,充值head節點if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}// p.next == null 表明鏈表為空else if ((q = p.next) == null) {updateHead(h, p);return null;}else if (p == q)continue restartFromHead;elsep = q;}}}7.2LinkedBlockingQueue阻塞隊列
1、類圖:
有上圖可以看出LinkedBlockingQueue也是使用的單向鏈表實現的,也有兩個Node,分別用來代表首節點和尾節點,一個AtomicInteger類型的count表示隊列的元素個數,另外還有兩個ReentrantLock類型的實例,分別控制入隊和出隊的原子性,以及兩個Condition類型條件變量
transient Node<E> head;private transient Node<E> last;/** 出隊鎖*/private final ReentrantLock takeLock = new ReentrantLock();/** 非空條件變量 */private final Condition notEmpty = takeLock.newCondition();/** 入隊鎖*/private final ReentrantLock putLock = new ReentrantLock();/** 非滿條件變量*/private final Condition notFull = putLock.newCondition();2、offer方法
向隊尾插入一個元素,如果隊列有空閑則插入成功,返回true,如果丟列已滿,則返回false,注意,該方法是非阻塞的(put方法是阻塞的)
public boolean offer(E e) {// 如果參數e為null時,則會拋出異常if (e == null) throw new NullPointerException();// 獲取容量count,AtomicInteger類型對象final AtomicInteger count = this.count;// 判斷當前容量是否已滿,滿的話返回false,不能入隊if (count.get() == capacity)return false;int c = -1;// 將參數e構建為節點對象Node<E> node = new Node<E>(e);// 獲取入隊鎖final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {//入隊enqueue(node);// 節點數量++c = count.getAndIncrement();// 釋放非滿信號,可以繼續入隊if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}// 如對,鏈表的最后一個節點private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;}3、put方法,基本和offer方法類似,只是在容量已滿是,會阻塞當前線程,而不是直接返回false
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {// 如果已滿,則阻塞線程,等待相應喚醒,喚醒之后會繼續判斷是否已滿(防止偽共享出現)while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)// 發出喚醒其他線程可以入隊的信號notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();}4、poll出隊(非阻塞)、peek出隊(非阻塞)、take出隊(阻塞)
poll:出隊會刪除出隊元素
peek:出隊不會刪除
? ? ? ? ? ?take:出隊刪除,并且是阻塞的
?
7.3ArrayBlockingQueue有界阻塞隊列
1、類圖
有類圖可以看出ArrayBlockingQueue中有一個Object類型的數組,用來存放隊列元素,putindex、takeIndex分別代表入隊和出隊索引,count代表隊列元素個數,從定義可知,這些變量都沒有使用volatile修改,因為相關的操作都是在鎖內的,而鎖又可以滿足可見性和原子性,另外有兩個條件變量notEmpty和notFull來控制入隊和出隊的同步。
/** 隊列數組 */final Object[] items;/** 出隊索引 */int takeIndex;/** 入隊索引 */int putIndex;/** 隊列元素個數 */int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** 一個獨占鎖 */final ReentrantLock lock;/** 條件變量 */private final Condition notEmpty;/** 條件變量 */private final Condition notFull;
2、offer操作,向隊尾插入一個元素,該方法是不阻塞的
offer操作和put操作類似,只不過put是阻塞的入隊操作
public boolean offer(E e) {// 校驗元素非空checkNotNull(e);final ReentrantLock lock = this.lock;// 加鎖lock.lock();try {// 已滿則返回falseif (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}} private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;// 向入隊索引處插入新元素final Object[] items = this.items;items[putIndex] = x;// 插入之后,入隊索引自增if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}3、poll操作和take操作
同樣的,一個非阻塞操作,一個阻塞操作
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)// 阻塞當前線程notEmpty.await();return dequeue();} finally {lock.unlock();}}ArrayBlockingQueue通過使用全局獨占鎖實現了同時只有一個線程進行入隊和出隊操作,這個鎖的粒度比較大,和在方法上添加synchronized關鍵字類似
7.4PriorityBlockingQueue帶優先級的無界隊列
該隊列每次返回的都是優先級最高或者最低的元素,其內部是使用的二叉樹堆實現的,所以每次遍歷隊列不保證有序。默認使用的是compareTo方法提供的比較規則。
1、類圖
有類圖可以看出,PriorityBlockingQueue內部有一個Object類型的數組queue,用來存放隊列元素的,size用來表示元素個數,allocationSpinLock是一個自旋鎖,使用CAS操作來保證同時只有一個線程可以擴容隊列,狀態為0或1,0表示沒有進行擴容,1表示正在進行擴容。comparator比較器,區分元素優先級的,lock獨占鎖用來控制同一時間只有一個線程可以進行入隊和出隊操作。notEmpte條件變量用來實現take方法的阻塞模式。(這里沒有notFull條件變量,put方法是非阻塞的,因為PriorityBlockingQueue是一個無界隊列)
2、offer操作和add操作
// add方法內部也是調用的offer方法 public boolean add(E e) {return offer(e);}public boolean offer(E e) {if (e == null)throw new NullPointerException();// 獲取獨占鎖final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;// 判斷您是否需要進行擴容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {// 獲取當前的比較器,默認比較器為null(如果構建隊列的時候沒有提供比較器,則為null)Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}// 擴容private void tryGrow(Object[] array, int oldCap) {// 在擴容開始,先釋放鎖,是為了性能考慮,擴容是需要時間的,如果在擴容的同事占用鎖,會降低并發性,所以為了提供并發性,使用CAS操作來保證只有一個線程可以進行擴容,讓其他線程可以入隊和出隊lock.unlock(); // must release and then re-acquire main lock// 擴容之后的數組 ,具體對象等計算出新的大小后會賦值Object[] newArray = null;// allocationSpinLock為0,表示沒有線程進行擴容,使用CAS操作設置該變量為1,則表示有線程正在進行擴容, 也就是鎖CAS操作成功則進行擴容if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {// 當容量小的時候,擴容增速塊,大64后,擴容為50%int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));// 如果擴容后,大于Maxinteger-8,則設置默認最大容量if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}// 新建指定大小的列表數組if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {// 同時將allocationSpinLock重新設置為0,表名沒有正在進行擴容的線程allocationSpinLock = 0;}}// 這一個判斷是當第一個線程CAS成功之后,第二個線程也進入擴容節點,則讓第二線程讓出cpu,讓第一線程盡快執行完擴容if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();// 擴容成功之后,將舊數組中的數據復制到新數組中if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}// 默認比較器的入隊操作,也就是建堆算法private static <T> void siftUpComparable(int k, T x, Object[] array) {// 新增的元素都是Compareable的子類Comparable<? super T> key = (Comparable<? super T>) x;// k為之前隊列個數,如果原來隊列元素大于0,則需要判斷當前新增元素的位置,否則,直接入隊while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;}?重點看建堆方法(即元素入隊):
假設隊列初始化大小2,默認比價器,以下為int類型的元素入隊? 分別是offer(2) / offer(4)? /? ?offer(6)? /? ?offer(1)? 4次入隊?
1、當調用offer(2)時,在獲取獨占鎖后,判斷時候當前是否需要擴容,如果正在進行擴容,則自旋等待擴容完畢,沒有則進入建堆方法(即下邊的siftUpComparable(),該方法三個參數,第一:當前隊列元素的數量,第二:入隊的元素對象,第三:列表底層數組),
該方法內,會先進性判斷k(當前列表內已有的元素數量),如果當前元素數量不大于0(即還沒有元素),則直接將array[0] 設置為當前入隊元素,否則進入while循環進行建堆,當本次調用offer(2)時,為第一次添加元素,則直接將array[0]設置為2。則當前元素數量n=1,當前隊列大小size=1,容量cap=length=2,size+1
? ?----->>>? ? ?
? 2、當第二次調用offer方法是,即調用offer(4)時,同樣先進性判斷是否需要擴容,沒有則進入siftUpComparable方法,此時參數k=1,進入while循環,循環內計算得到parent=0,e=2,key=4(key就是當前要入隊的元素),因為key>e,退出循環,執行array[k] = key代碼,即將當前入隊的元素放置到下表為1的位置,size+1即如下圖
3、第三次調用offer 方法,即調用offer(6)時,同樣判斷是否需要擴容,因為當前n=size=2 >= cap則需要進行擴容,進入擴容方法(這一塊看上邊代碼,最終會將原數組內的元素復制到新的數組中),擴容后繼續調用siftUpComparable方法,此時參數k=size=2,x=6,array為新的數組(長度為2+(2+2),即cap = cap+ (cap+2)這個實在容量較小的情況下,否則將容量擴大50%),key=6,此時k大于0,進入循環,計算的parent=0,e=2,因為key>e,則退出循環,將array[2]設置為6,size+1即下圖
4、第四次調用offer方法,即調用offer(1),同樣判斷是否需要擴容,此時不需要擴容,則進入siftUpComparable方法,此時參數k=3,x=1,array=[2,4,6],key=1,此時k>0,進入循環,計算的parent=1,e=array[1]=4,此時key<e,則將元素4復制到k下標出,即
array[3]=4,此時數組為【2,4,6,4】,k重新設置為1,繼續循環(因為k仍大于0),第二次循環,parent=0,e=array[0]=2,key=1,此時key<e,則將array[1]設置為2,k=0,此時數組為【2,2,6,4】,此時k=0,終止循環,最終將array[0]設置為1,此時數組為【1,2,6,4】
?
1 // 默認比較器的情況 2 private static <T> void siftUpComparable(int k, T x, Object[] array) {3 Comparable<? super T> key = (Comparable<? super T>) x;4 while (k > 0) {5 int parent = (k - 1) >>> 1;6 Object e = array[parent];7 if (key.compareTo((T) e) >= 0)8 break;9 array[k] = e; 10 k = parent; 11 } 12 array[k] = key; 13 }?
7.5DelayQueue無界阻塞延遲隊列
該隊列屬于無界阻塞的延遲隊列,隊列中的每一個元素都有個過期時間,當從隊列獲取元素時,只有過期的元素才會出隊,隊列的頭元素是最快要過期的元素。
1、類圖:
用類圖可以看出,延遲隊列內部使用PriorityQueue存放數據,使用ReentrantLock實現線程同步。另外,隊列里邊的元素都要實現Delayed接口,由于每個元素都有一個過期時間,所以要實現獲知當前元素還有多久時間過期的接口,由于內部是有優先級隊列來實現,所以要實現元素之間相互比較的接口Delayed接口
?
總結
以上是生活随笔為你收集整理的java并发编程之美-阅读记录7的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java并发编程之美-阅读记录6
- 下一篇: leetcode-67-二进制求和