日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)

發(fā)布時(shí)間:2024/1/17 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

AQS是AbstractQueuedSynchronizer的縮寫,AQS是Java并包里大部分同步器的基礎(chǔ)構(gòu)件,利用AQS可以很方便的創(chuàng)建鎖和同步器。它封裝了一個(gè)狀態(tài),提供了一系列的獲取和釋放操作,這些獲取和釋放操作都是基于狀態(tài)的。它的基本思想是由AQS負(fù)責(zé)管理同步器類中的狀態(tài),其他的同步器比如可重入鎖ReentrantLock, 信號(hào)量Semaphore基于各自的特點(diǎn)來調(diào)用AQS提供了基礎(chǔ)能力進(jìn)行狀態(tài)的同步。

?

在AQS的Javadoc里面提到它是CLHLock的變種,在聊聊高并發(fā)(八)實(shí)現(xiàn)幾種自旋鎖(三)?這篇文章中我們說了如何利用CLH鎖來構(gòu)件自旋鎖,回顧一下CLHLock的一些基本特點(diǎn):

1. CLHLock是一種隊(duì)列自旋鎖的實(shí)現(xiàn),提供了FIFO先來先服務(wù)的公平性

2. 利用一個(gè)原子變量AtomicReference tail的CAS操作來構(gòu)件一個(gè)虛擬的鏈?zhǔn)浇Y(jié)構(gòu)

3. 節(jié)點(diǎn)Node維護(hù)一個(gè)volatile狀態(tài),維護(hù)一個(gè)prev指針指向前一個(gè)節(jié)點(diǎn),獲取鎖時(shí)每個(gè)線程在prev節(jié)點(diǎn)的狀態(tài)上自旋

4. 當(dāng)線程釋放鎖時(shí),只需要修改自身狀態(tài)即可,后續(xù)節(jié)點(diǎn)會(huì)觀察到volatile狀態(tài)的改動(dòng)而獲取鎖

?

AQS既然是CLHLock的一種變種,那么

1. 也維護(hù)以了一個(gè)基本的隊(duì)列結(jié)構(gòu)

2. 也是提供了一個(gè)Tail指針從隊(duì)尾通過CAS操作入隊(duì)列。

3. 提供了一個(gè)volatile類型的int值來維護(hù)狀態(tài)

?

?
  • public abstract class AbstractQueuedSynchronizer

  • extends AbstractOwnableSynchronizer

  • implements java.io.Serializable {

  • ?private transient volatile Node head;

  • ?
  • ??? private transient volatile Node tail;

  • ?
  • ??? private volatile int state;

  • ?
  • ?
  • ??? protected final int getState() {

  • ??????? return state;

  • ??? }

  • ?
  • ??

  • ??? protected final void setState(int newState) {

  • ??????? state = newState;

  • ??? }

  • ?
  • ?
  • ??? protected final boolean compareAndSetState(int expect, int update) {

  • ??????? // See below for intrinsics setup to support this

  • ??????? return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

  • ??? }

  • ?
  • ..................

  • }

  • ?

    與標(biāo)準(zhǔn)CLHLock實(shí)現(xiàn)不同的是,AQS不是一個(gè)自旋鎖,它提供了更加豐富的語意:

    1. 提供了獨(dú)享(exclusive)方式和共享(share)方式來獲取/釋放,比如鎖是獨(dú)占方式的,信號(hào)量semaphore是共享方式的,可以有多個(gè)線程進(jìn)入臨界區(qū)?

    2. 支持可中斷和不可中斷的獲取/釋放

    3. 支持普通的和具有時(shí)間限制的獲取/釋放

    4. 提供了自旋和阻塞的切換,可以先自旋,如果等待時(shí)間長,可以阻塞

    ?

    ?
  • /**

  • * The number of nanoseconds for which it is faster to spin

  • * rather than to use timed park. A rough estimate suffices

  • * to improve responsiveness with very short timeouts.

  • */

  • static final long spinForTimeoutThreshold = 1000L;


  • AQS定義了兩個(gè)內(nèi)部類來輔助它的實(shí)現(xiàn),一個(gè)是Node定義了隊(duì)列中的節(jié)點(diǎn),另一個(gè)是ConditionObject,是Condition接口的實(shí)現(xiàn)類,負(fù)責(zé)管理?xiàng)l件隊(duì)列。關(guān)于條件隊(duì)列更多內(nèi)容可以看這篇?聊聊高并發(fā)(十四)理解Java中的管程,條件隊(duì)列,Condition以及實(shí)現(xiàn)一個(gè)阻塞隊(duì)列

    ?

    先看下Node類,它比CLHLock中的Node有更多屬性,除了完成基本的隊(duì)列功能,還維護(hù)了是獨(dú)享還是共享的模式信息

    1. 維護(hù)了一個(gè)Node SHARED引用表示共享模式

    2. 維護(hù)了一個(gè)Node EXCLUSIVE引用表示獨(dú)占模式

    3. 維護(hù)了幾種節(jié)點(diǎn)等待的狀態(tài) waitStatus, 其中CANCELLED = 1是正數(shù),表示取消狀態(tài),SIGNAL = -1,CONDITION = -2, PROPAGATE = -3都是負(fù)數(shù),表示節(jié)點(diǎn)在條件隊(duì)列的某個(gè)狀態(tài),SIGNAL表示后續(xù)節(jié)點(diǎn)需要被喚醒

    4. 維護(hù)了Node prev引用,指向隊(duì)列中的前一個(gè)節(jié)點(diǎn),通過Tail的CAS操作來創(chuàng)建

    5. 維護(hù)了Node next引用,指向隊(duì)列中的下一個(gè)節(jié)點(diǎn),也是在通過Tail入隊(duì)列的時(shí)候設(shè)置的,這樣就維護(hù)了一個(gè)雙向隊(duì)列

    6. 維護(hù)了一個(gè)volatile的Thread引用,把一個(gè)節(jié)點(diǎn)關(guān)聯(lián)到一個(gè)線程

    7. 維護(hù)了Node nextWaiter引用,指向在條件隊(duì)列中的下一個(gè)正在等待的節(jié)點(diǎn),是給條件隊(duì)列使用的。值得注意的是條件隊(duì)列只有在獨(dú)享狀態(tài)下才使用

    ?

    ?
  • static final class Node {

  • /** Marker to indicate a node is waiting in shared mode */

  • static final Node SHARED = new Node();

  • /** Marker to indicate a node is waiting in exclusive mode */

  • static final Node EXCLUSIVE = null;

  • ?
  • /** waitStatus value to indicate thread has cancelled */

  • static final int CANCELLED = 1;

  • /** waitStatus value to indicate successor's thread needs unparking */

  • static final int SIGNAL = -1;

  • /** waitStatus value to indicate thread is waiting on condition */

  • static final int CONDITION = -2;

  • /**

  • * waitStatus value to indicate the next acquireShared should

  • * unconditionally propagate

  • */

  • static final int PROPAGATE = -3;

  • ?
  • volatile int waitStatus;

  • ?
  • volatile Node prev;

  • ?
  • volatile Node next;

  • ?
  • volatile Thread thread;

  • ?
  • Node nextWaiter;

  • ?
  • final boolean isShared() {

  • return nextWaiter == SHARED;

  • }

  • ?
  • final Node predecessor() throws NullPointerException {

  • Node p = prev;

  • if (p == null)

  • throw new NullPointerException();

  • else

  • return p;

  • }

  • ?
  • Node() { // Used to establish initial head or SHARED marker

  • }

  • ?
  • Node(Thread thread, Node mode) { // Used by addWaiter

  • this.nextWaiter = mode;

  • this.thread = thread;

  • }

  • ?
  • Node(Thread thread, int waitStatus) { // Used by Condition

  • this.waitStatus = waitStatus;

  • this.thread = thread;

  • }

  • }

  • ?

    再看一下ConditionObject,它是條件Condition接口的具體實(shí)現(xiàn),維護(hù)了一個(gè)條件隊(duì)列,條件隊(duì)列是通過Node來構(gòu)件的一個(gè)單向鏈表結(jié)構(gòu)。底層的條件操作(等待和喚醒)使用LockSupport類來實(shí)現(xiàn),在這篇中我們說了LockSupport底層使用sun.misc.Unsafe來提供條件隊(duì)列的park和unpark操作。聊聊高并發(fā)(十七)解析java.util.concurrent各個(gè)組件(一) 了解sun.misc.Unsafe類

    1. 維護(hù)了一個(gè)Node firstWaiter引用指向條件隊(duì)列的隊(duì)首節(jié)點(diǎn)

    2. 維護(hù)了一個(gè)Node lastWaiter引用指向條件隊(duì)列的隊(duì)尾節(jié)點(diǎn)

    3. 條件隊(duì)列支持節(jié)點(diǎn)的取消退出機(jī)制,CANCELLED節(jié)點(diǎn)來表示這種取消狀態(tài)

    4. 支持限時(shí)等待機(jī)制

    5. 支持可中斷和不可中斷的等待

    我們來看幾個(gè)典型的條件隊(duì)列的操作實(shí)現(xiàn)

    往條件隊(duì)列里面加入一個(gè)等待節(jié)點(diǎn),這個(gè)是await()方法的基本操作

    1. 判斷尾節(jié)點(diǎn)的狀態(tài)是不是等待某個(gè)條件的狀態(tài)(CONDITION),如果不是,就把CANCELLED節(jié)點(diǎn)從隊(duì)列中踢出,然后把自己標(biāo)記為尾節(jié)點(diǎn)

    ?

    ?
  • public class ConditionObject implements Condition, java.io.Serializable {

  • /** First node of condition queue. */

  • private transient Node firstWaiter;

  • /** Last node of condition queue. */

  • private transient Node lastWaiter;

  • ?
  • /**

  • ???????? * Adds a new waiter to wait queue.

  • ???????? * @return its new wait node

  • ???????? */

  • ??????? private Node addConditionWaiter() {

  • ??????????? Node t = lastWaiter;

  • ??????????? // If lastWaiter is cancelled, clean out.

  • ??????????? if (t != null && t.waitStatus != Node.CONDITION) {

  • ??????????????? unlinkCancelledWaiters();

  • ??????????????? t = lastWaiter;

  • ??????????? }

  • ??????????? Node node = new Node(Thread.currentThread(), Node.CONDITION);

  • ??????????? if (t == null)

  • ??????????????? firstWaiter = node;

  • ??????????? else

  • ??????????????? t.nextWaiter = node;

  • ??????????? lastWaiter = node;

  • ??????????? return node;

  • ??????? }

  • ?
  • private void unlinkCancelledWaiters() {

  • ??????????? Node t = firstWaiter;

  • ??????????? Node trail = null;

  • ??????????? while (t != null) {

  • ??????????????? Node next = t.nextWaiter;

  • ??????????????? if (t.waitStatus != Node.CONDITION) {

  • ??????????????????? t.nextWaiter = null;

  • ??????????????????? if (trail == null)

  • ??????????????????????? firstWaiter = next;

  • ??????????????????? else

  • ??????????????????????? trail.nextWaiter = next;

  • ??????????????????? if (next == null)

  • ??????????????????????? lastWaiter = trail;

  • ??????????????? }

  • ??????????????? else

  • ??????????????????? trail = t;

  • ??????????????? t = next;

  • ??????????? }

  • ??????? }

  • .................

  • }

  • 從條件隊(duì)列中喚醒一個(gè)節(jié)點(diǎn),實(shí)際上doSignal只是把一個(gè)節(jié)點(diǎn)從條件隊(duì)列中移除,然后加入到同步隊(duì)列,并設(shè)置它在同步隊(duì)列的前置節(jié)點(diǎn)的waitStatus = SIGNAL, 如果設(shè)置失敗或者取消在條件隊(duì)列等待,直接把這個(gè)節(jié)點(diǎn)的線程unpark喚醒,需要注意的是unpark操作只是把線程從等待狀態(tài)轉(zhuǎn)化為可運(yùn)行狀態(tài),并不直接獲得鎖。

    ?

    ?
  • ?public final void signal() {

  • ??????????? if (!isHeldExclusively())

  • ??????????????? throw new IllegalMonitorStateException();

  • ??????????? Node first = firstWaiter;

  • ??????????? if (first != null)

  • ??????????????? doSignal(first);

  • ??????? }

  • ?
  • ?
  • /**

  • * Removes and transfers nodes until hit non-cancelled one or

  • * null. Split out from signal in part to encourage compilers

  • * to inline the case of no waiters.

  • * @param first (non-null) the first node on condition queue

  • */

  • private void doSignal(Node first) {

  • do {

  • if ( (firstWaiter = first.nextWaiter) == null)

  • lastWaiter = null;

  • first.nextWaiter = null;

  • } while (!transferForSignal(first) &&

  • (first = firstWaiter) != null);

  • }

  • ?
  • ? ?final boolean transferForSignal(Node node) {

  • ??????? /*

  • ???????? * If cannot change waitStatus, the node has been cancelled.

  • ???????? */

  • ??????? if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

  • ??????????? return false;

  • ?
  • ??????? /*

  • ???????? * Splice onto queue and try to set waitStatus of predecessor to

  • ???????? * indicate that thread is (probably) waiting. If cancelled or

  • ???????? * attempt to set waitStatus fails, wake up to resync (in which

  • ???????? * case the waitStatus can be transiently and harmlessly wrong).

  • ???????? */

  • ??????? Node p = enq(node);

  • ??????? int ws = p.waitStatus;

  • ??????? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

  • ??????????? LockSupport.unpark(node.thread);

  • ??????? return true;

  • ??? }


  • Java線程的幾種狀態(tài)如下

    ?

    支持中斷的等待操作,?主要做了兩個(gè)事情:新建一個(gè)Node進(jìn)入條件隊(duì)列等待被喚醒;從同步隊(duì)列中移除并釋放鎖。它會(huì)相應(yīng)線程的中斷拋出中斷異常,并且記錄中斷狀態(tài)

    ?

    ?
  • public final void await() throws InterruptedException {

  • if (Thread.interrupted())

  • throw new InterruptedException();

  • Node node = addConditionWaiter();

  • int savedState = fullyRelease(node);

  • int interruptMode = 0;

  • while (!isOnSyncQueue(node)) {

  • LockSupport.park(this);

  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  • break;

  • }

  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  • interruptMode = REINTERRUPT;

  • if (node.nextWaiter != null) // clean up if cancelled

  • unlinkCancelledWaiters();

  • if (interruptMode != 0)

  • reportInterruptAfterWait(interruptMode);

  • }


  • 不可中斷的等待,也是先進(jìn)入條件隊(duì)列等待,并從同步隊(duì)列出隊(duì)列,釋放鎖。但是它不相應(yīng)線程中斷狀態(tài)

    ?

    ?
  • public final void awaitUninterruptibly() {

  • Node node = addConditionWaiter();

  • int savedState = fullyRelease(node);

  • boolean interrupted = false;

  • while (!isOnSyncQueue(node)) {

  • LockSupport.park(this);

  • if (Thread.interrupted())

  • interrupted = true;

  • }

  • if (acquireQueued(node, savedState) || interrupted)

  • selfInterrupt();

  • }


  • 限時(shí)等待,也是先進(jìn)入條件隊(duì)列等待,然后釋放鎖。輪詢等待時(shí)間,當(dāng)超時(shí)后再次進(jìn)入同步隊(duì)列,等待獲得鎖。如果獲得了鎖,就返回false. 如果在等待時(shí)被喚醒,就進(jìn)入同步隊(duì)列,等待獲得鎖,如果獲得鎖就返回true

    ?

    ?
  • public final boolean await(long time, TimeUnit unit)

  • throws InterruptedException {

  • if (unit == null)

  • throw new NullPointerException();

  • long nanosTimeout = unit.toNanos(time);

  • if (Thread.interrupted())

  • throw new InterruptedException();

  • Node node = addConditionWaiter();

  • int savedState = fullyRelease(node);

  • long lastTime = System.nanoTime();

  • boolean timedout = false;

  • int interruptMode = 0;

  • while (!isOnSyncQueue(node)) {

  • if (nanosTimeout <= 0L) {

  • timedout = transferAfterCancelledWait(node);

  • break;

  • }

  • if (nanosTimeout >= spinForTimeoutThreshold)

  • LockSupport.parkNanos(this, nanosTimeout);

  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  • break;

  • long now = System.nanoTime();

  • nanosTimeout -= now - lastTime;

  • lastTime = now;

  • }

  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  • interruptMode = REINTERRUPT;

  • if (node.nextWaiter != null)

  • unlinkCancelledWaiters();

  • if (interruptMode != 0)

  • reportInterruptAfterWait(interruptMode);

  • return !timedout;

  • }


  • AQS使用了Unsafe直接操作內(nèi)存來對(duì)字段進(jìn)行CAS操作和設(shè)置值。

    ?

    ?
  • private static final Unsafe unsafe = Unsafe.getUnsafe();

  • private static final long stateOffset;

  • private static final long headOffset;

  • private static final long tailOffset;

  • private static final long waitStatusOffset;

  • private static final long nextOffset;

  • ?
  • static {

  • try {

  • stateOffset = unsafe.objectFieldOffset

  • (AbstractQueuedSynchronizer.class.getDeclaredField("state"));

  • headOffset = unsafe.objectFieldOffset

  • (AbstractQueuedSynchronizer.class.getDeclaredField("head"));

  • tailOffset = unsafe.objectFieldOffset

  • (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));

  • waitStatusOffset = unsafe.objectFieldOffset

  • (Node.class.getDeclaredField("waitStatus"));

  • nextOffset = unsafe.objectFieldOffset

  • (Node.class.getDeclaredField("next"));

  • ?
  • } catch (Exception ex) { throw new Error(ex); }

  • }

  • 總結(jié)

    以上是生活随笔為你收集整理的聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。