日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

AQS的原理及应用

發布時間:2024/1/23 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 AQS的原理及应用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

Java中的大部分同步類(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(簡稱為AQS)實現的。AQS是一種提供了原子式管理同步狀態、阻塞和喚醒線程功能以及隊列模型的簡單框架。本文會從應用層逐漸深入到原理層,并通過ReentrantLock的基本特性和ReentrantLock與AQS的關聯,來深入解讀AQS相關獨占鎖的知識點,同時采取問答的模式來幫助大家理解AQS。由于篇幅原因,本篇文章主要闡述AQS中獨占鎖的邏輯和Sync Queue,不講述包含共享鎖和Condition Queue的部分(本篇文章核心為AQS原理剖析,只是簡單介紹了ReentrantLock,感興趣同學可以閱讀一下ReentrantLock的源碼)。

下面列出本篇文章的大綱和思路,以便于大家更好地理解:

1 ReentrantLock

1.1 ReentrantLock特性概覽

ReentrantLock意思為可重入鎖,指的是一個線程能夠對一個臨界資源重復加鎖。為了幫助大家更好地理解ReentrantLock的特性,我們先將ReentrantLock跟常用的Synchronized進行比較,其特性如下(藍色部分為本篇文章主要剖析的點):

下面通過偽代碼,進行更加直觀的比較:

//?**************************Synchronized的使用方式************************** //?1.用于代碼塊 synchronized?(this)?{} //?2.用于對象 synchronized?(object)?{} //?3.用于方法 public?synchronized?void?test?()?{} //?4.可重入 for?(int?i?=?0;?i?<?100;?i++)?{synchronized?(this)?{} } //?**************************ReentrantLock的使用方式************************** public?void?test?()?throw?Exception?{//?1.初始化選擇公平鎖、非公平鎖ReentrantLock?lock?=?new?ReentrantLock(true);//?2.可用于代碼塊lock.lock();try?{try?{//?3.支持多種加鎖方式,比較靈活;?具有可重入特性if(lock.tryLock(100,?TimeUnit.MILLISECONDS)){?}}?finally?{//?4.手動釋放鎖lock.unlock()}}?finally?{lock.unlock();} }

1.2 ReentrantLock與AQS的關聯

通過上文我們已經了解,ReentrantLock支持公平鎖和非公平鎖(關于公平鎖和非公平鎖的原理分析,可參考《不可不說的Java“鎖”事》),并且ReentrantLock的底層就是由AQS來實現的。那么ReentrantLock是如何通過公平鎖和非公平鎖與AQS關聯起來呢?我們著重從這兩者的加鎖過程來理解一下它們與AQS之間的關系(加鎖過程中與AQS的關聯比較明顯,解鎖流程后續會介紹)。

非公平鎖源碼中的加鎖流程如下:

//?java.util.concurrent.locks.ReentrantLock#NonfairSync//?非公平鎖 static?final?class?NonfairSync?extends?Sync?{...final?void?lock()?{if?(compareAndSetState(0,?1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}... }

這塊代碼的含義為:

  • 若通過CAS設置變量State(同步狀態)成功,也就是獲取鎖成功,則將當前線程設置為獨占線程。

  • 若通過CAS設置變量State(同步狀態)失敗,也就是獲取鎖失敗,則進入Acquire方法進行后續處理。

第一步很好理解,但第二步獲取鎖失敗后,后續的處理策略是怎么樣的呢?這塊可能會有以下思考:

  • 某個線程獲取鎖失敗的后續流程是什么呢?有以下兩種可能:

    • 將當前線程獲鎖結果設置為失敗,獲取鎖流程結束。這種設計會極大降低系統的并發度,并不滿足我們實際的需求。所以就需要下面這種流程,也就是AQS框架的處理流程。

    • 存在某種排隊等候機制,線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續。

  • 對于問題1的第二種情況,既然說到了排隊等候機制,那么就一定會有某種隊列形成,這樣的隊列是什么數據結構呢?

  • 處于排隊等候機制中的線程,什么時候可以有機會獲取鎖呢?

  • 如果處于排隊等候機制中的線程一直無法獲取鎖,還是需要一直等待嗎,還是有別的策略來解決這一問題?

帶著非公平鎖的這些問題,再看下公平鎖源碼中獲鎖的方式:

//?java.util.concurrent.locks.ReentrantLock#FairSyncstatic?final?class?FairSync?extends?Sync?{...??final?void?lock()?{acquire(1);}... }

看到這塊代碼,我們可能會存在這種疑問:Lock函數通過Acquire方法進行加鎖,但是具體是如何加鎖的呢?

結合公平鎖和非公平鎖的加鎖流程,雖然流程上有一定的不同,但是都調用了Acquire方法,而Acquire方法是FairSync和UnfairSync的父類AQS中的核心方法。

對于上邊提到的問題,其實在ReentrantLock類源碼中都無法解答,而這些問題的答案,都是位于Acquire方法所在的類AbstractQueuedSynchronizer中,也就是本文的核心——AQS。下面我們會對AQS以及ReentrantLock和AQS的關聯做詳細介紹(相關問題答案會在2.3.5小節中解答)。

2 AQS

首先,我們通過下面的架構圖來整體了解一下AQS框架:

  • 上圖中有顏色的為Method,無顏色的為Attribution。

  • 總的來說,AQS框架共分為五層,自上而下由淺入深,從AQS對外暴露的API到底層基礎數據。

  • 當有自定義同步器接入時,只需重寫第一層所需要的部分方法即可,不需要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操作時,先經過第一層的API進入AQS內部方法,然后經過第二層進行鎖的獲取,接著對于獲取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴于第五層的基礎數據提供層。

下面我們會從整體到細節,從流程到方法逐一剖析AQS框架,主要分析過程如下:

2.1 原理概覽

AQS核心思想是,如果被請求的共享資源空閑,那么就將當前請求資源的線程設置為有效的工作線程,將共享資源設置為鎖定狀態;如果共享資源被占用,就需要一定的阻塞等待喚醒機制來保證鎖分配。這個機制主要用的是CLH隊列的變體實現的,將暫時獲取不到鎖的線程加入到隊列中。

CLH:Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。

主要原理圖如下:

AQS使用一個Volatile的int類型的成員變量來表示同步狀態,通過內置的FIFO隊列來完成資源獲取的排隊工作,通過CAS完成對State值的修改。

2.1.1 AQS數據結構

先來看下AQS中最基本的數據結構——Node,Node即為上面CLH變體隊列中的節點。

解釋一下幾個方法和屬性值的含義:

方法和屬性值

含義

waitStatus

當前節點在隊列中的狀態

prev

前驅指針

next

后繼指針

thread

表示處于該節點的線程

nextWaiter

指向下一個處于CONDITION狀態的節點(由于本篇文章不講述Condition Queue隊列,這個指針不多介紹)

predecessor

返回前驅節點,沒有的話拋出npe

線程兩種鎖的模式:

模式

含義

SHARED

表示線程以共享的模式等待鎖

EXCLUSIVE

表示線程正在以獨占的方式等待鎖

waitStatus有下面幾個枚舉值:

枚舉

含義

CANCELLED

為1,表示線程獲取鎖的請求已經取消了

SIGNAL

為-1,表示線程已經準備好了,就等資源釋放了

CONDITION

為-2,表示節點在等待隊列中,節點線程等待喚醒

PROPAGATE

為-3,當前線程處在SHARED情況下,該字段才會使用

0

當一個Node被初始化的時候的默認值

2.1.2 同步狀態State

在了解數據結構后,接下來了解一下AQS的同步狀態——State。AQS中維護了一個名為state的字段,意為同步狀態,是由Volatile修飾的,用于展示當前臨界資源的獲鎖情況。

//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?volatile?int?state;

下面提供了幾個訪問這個字段的方法:

方法名

描述

protected final int getState()

獲取State的值

protected final void setState(int newState)

設置State的值

protected final boolean compareAndSetState(int expect, int update)

使用CAS方式更新State

這幾個方法都是Final修飾的,說明子類中無法重寫它們。我們可以通過修改State字段表示的同步狀態來實現多線程的獨占模式和共享模式(加鎖過程)。

對于我們自定義的同步工具,需要自定義獲取同步狀態和釋放狀態的方式,也就是AQS架構圖中的第一層:API層。

2.2 AQS重要方法與ReentrantLock的關聯

從架構圖中可以得知,AQS提供了大量用于自定義同步器實現的Protected方法。自定義同步器實現的相關方法也只是為了通過修改State字段來實現多線程的獨占模式或者共享模式。自定義同步器需要實現以下方法(ReentrantLock需要實現的方法如下,并不是全部):

方法名

描述

protected boolean isHeldExclusively()

該線程是否正在獨占資源。只有用到Condition才需要去實現它。

protected boolean tryAcquire(int arg)

獨占方式。arg為獲取鎖的次數,嘗試獲取資源,成功則返回True,失敗則返回False。

protected boolean tryRelease(int arg)

獨占方式。arg為釋放鎖的次數,嘗試釋放資源,成功則返回True,失敗則返回False。

protected int tryAcquireShared(int arg)

共享方式。arg為獲取鎖的次數,嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。

protected boolean tryReleaseShared(int arg)

共享方式。arg為釋放鎖的次數,嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回True,否則返回False。

一般來說,自定義同步器要么是獨占方式,要么是共享方式,它們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。AQS也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock。ReentrantLock是獨占鎖,所以實現了tryAcquire-tryRelease。

以非公平鎖為例,這里主要闡述一下非公平鎖與AQS之間方法的關聯之處,具體每一處核心方法的作用會在文章后面詳細進行闡述。

為了幫助大家理解ReentrantLock和AQS之間方法的交互過程,以非公平鎖為例,我們將加鎖和解鎖的交互流程單獨拎出來強調一下,以便于對后續內容的理解。

加鎖:

  • 通過ReentrantLock的加鎖方法Lock進行加鎖操作。

  • 會調用到內部類Sync的Lock方法,由于Sync#lock是抽象方法,根據ReentrantLock初始化選擇的公平鎖和非公平鎖,執行相關內部類的Lock方法,本質上都會執行AQS的Acquire方法。

  • AQS的Acquire方法會執行tryAcquire方法,但是由于tryAcquire需要自定義同步器實現,因此執行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通過公平鎖和非公平鎖內部類實現的tryAcquire方法,因此會根據鎖類型不同,執行不同的tryAcquire。

  • tryAcquire是獲取鎖邏輯,獲取失敗后,會執行框架AQS的后續邏輯,跟ReentrantLock自定義同步器無關。

?

解鎖:

  • 通過ReentrantLock的解鎖方法Unlock進行解鎖。

  • Unlock會調用內部類Sync的Release方法,該方法繼承于AQS。

  • Release中會調用tryRelease方法,tryRelease需要自定義同步器實現,tryRelease只在ReentrantLock中的Sync實現,因此可以看出,釋放鎖的過程,并不區分是否為公平鎖。

  • 釋放成功后,所有處理由AQS框架完成,與自定義同步器無關。

通過上面的描述,大概可以總結出ReentrantLock加鎖解鎖時API層核心方法的映射關系。

2.3 通過ReentrantLock理解AQS

ReentrantLock中公平鎖和非公平鎖在底層是相同的,這里以非公平鎖為例進行分析。

在非公平鎖中,有一段這樣的代碼:

//?java.util.concurrent.locks.ReentrantLockstatic?final?class?NonfairSync?extends?Sync?{...final?void?lock()?{if?(compareAndSetState(0,?1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}... }

看一下這個Acquire是怎么寫的:

//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?void?acquire(int?arg)?{if?(!tryAcquire(arg)?&&?acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))selfInterrupt(); }

再看一下tryAcquire方法:

//?java.util.concurrent.locks.AbstractQueuedSynchronizerprotected?boolean?tryAcquire(int?arg)?{throw?new?UnsupportedOperationException(); }

可以看出,這里只是AQS的簡單實現,具體獲取鎖的實現方法是由各自的公平鎖和非公平鎖單獨實現的(以ReentrantLock為例)。如果該方法返回了True,則說明當前線程獲取鎖成功,就不用往后執行了;如果獲取失敗,就需要加入到等待隊列中。下面會詳細解釋線程是何時以及怎樣被加入進等待隊列中的。

2.3.1 線程加入等待隊列

2.3.1.1 加入隊列的時機

當執行Acquire(1)時,會通過tryAcquire獲取鎖。在這種情況下,如果獲取鎖失敗,就會調用addWaiter加入到等待隊列中去。

2.3.1.2 如何加入隊列

獲取鎖失敗后,會執行addWaiter(Node.EXCLUSIVE)加入等待隊列,具體實現方法如下:

//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?addWaiter(Node?mode)?{Node?node?=?new?Node(Thread.currentThread(),?mode);//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failureNode?pred?=?tail;if?(pred?!=?null)?{node.prev?=?pred;if?(compareAndSetTail(pred,?node))?{pred.next?=?node;return?node;}}enq(node);return?node; } private?final?boolean?compareAndSetTail(Node?expect,?Node?update)?{return?unsafe.compareAndSwapObject(this,?tailOffset,?expect,?update); }

主要的流程如下:

(1)通過當前的線程和鎖模式新建一個節點。

(2)Pred指針指向尾節點Tail。

(3)將New中Node的Prev指針指向Pred。

(4)通過compareAndSetTail方法,完成尾節點的設置。這個方法主要是對tailOffset和Expect進行比較,如果tailOffset的Node和Expect的Node地址是相同的,那么設置Tail的值為Update的值。

//?java.util.concurrent.locks.AbstractQueuedSynchronizerstatic?{try?{stateOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset?=?unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset?=?unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));}?catch?(Exception?ex)?{?throw?new?Error(ex);?} }

從AQS的靜態代碼塊可以看出,都是獲取一個對象的屬性相對于該對象在內存當中的偏移量,這樣我們就可以根據這個偏移量在對象內存當中找到這個屬性。tailOffset指的是tail對應的偏移量,所以這個時候會將new出來的Node置為當前隊列的尾節點。同時,由于是雙向鏈表,也需要將前一個節點指向尾節點。

(5)?如果Pred指針是Null(說明等待隊列中沒有元素),或者當前Pred指針和Tail指向的位置不同(說明被別的線程已經修改),就需要看一下Enq的方法。

//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?enq(final?Node?node)?{for?(;;)?{Node?t?=?tail;if?(t?==?null)?{?//?Must?initializeif?(compareAndSetHead(new?Node()))tail?=?head;}?else?{node.prev?=?t;if?(compareAndSetTail(t,?node))?{t.next?=?node;return?t;}}} }

如果沒有被初始化,需要進行初始化一個頭結點出來。但請注意,初始化的頭結點并不是當前線程節點,而是調用了無參構造函數的節點。如果經歷了初始化或者并發導致隊列中有元素,則與之前的方法相同。其實,addWaiter就是一個在雙端鏈表添加尾節點的操作,需要注意的是,雙端鏈表的頭結點是一個無參構造函數的頭結點。

總結一下,線程獲取鎖的時候,過程大體如下:

a. 當沒有線程獲取到鎖時,線程1獲取鎖成功。

b. 線程2申請鎖,但是鎖被線程1占有。

c. 如果再有線程要獲取鎖,依次在隊列中往后排隊即可。

回到上邊的代碼,hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。如果返回False,說明當前線程可以爭取共享資源;如果返回True,說明隊列中存在有效節點,當前線程必須加入到等待隊列中。

//?java.util.concurrent.locks.ReentrantLockpublic?final?boolean?hasQueuedPredecessors()?{//?The?correctness?of?this?depends?on?head?being?initialized//?before?tail?and?on?head.next?being?accurate?if?the?current//?thread?is?first?in?queue.Node?t?=?tail;?//?Read?fields?in?reverse?initialization?orderNode?h?=?head;Node?s;return?h?!=?t?&&?((s?=?h.next)?==?null?||?s.thread?!=?Thread.currentThread()); }

看到這里,我們理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());為什么要判斷的頭結點的下一個節點?第一個節點儲存的數據是什么?

雙向鏈表中,第一個節點為虛節點,其實并不存儲任何信息,只是占位。真正的第一個有數據的節點,是在第二個節點開始的。當h != t時:

?

a. 如果(s = h.next) == null,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,需要返回True(這塊具體見下邊代碼分析)。
b. 如果(s = h.next) != null,說明此時隊列中至少有一個有效節點。如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節點中的線程與當前線程相同,那么當前線程是可以獲取資源的;如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節點線程與當前線程不同,當前線程必須加入進等待隊列。

?

//?java.util.concurrent.locks.AbstractQueuedSynchronizer#enqif?(t?==?null)?{?//?Must?initializeif?(compareAndSetHead(new?Node()))tail?=?head; }?else?{node.prev?=?t;if?(compareAndSetTail(t,?node))?{t.next?=?node;return?t;} }

?

節點入隊不是原子操作,所以會出現短暫的head != tail,此時Tail指向最后一個節點,而且Tail指向Head。如果Head沒有指向Tail(可見5、6、7行),這種情況下也需要將相關線程加入隊列中。所以這塊代碼是為了解決極端情況下的并發問題。

2.3.1.3 等待隊列中線程出隊列時機

回到最初的源碼:

//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?void?acquire(int?arg)?{if?(!tryAcquire(arg)?&&?acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))selfInterrupt(); }

上文解釋了addWaiter方法,這個方法其實就是把對應的線程以Node的數據結構形式加入到雙端隊列里,返回的是一個包含該線程的Node。而這個Node會作為參數,進入到acquireQueued方法中。acquireQueued方法可以對排隊中的線程進行“獲鎖”操作。

總的來說,一個線程獲取鎖失敗了,被放入等待隊列,acquireQueued會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者不再需要獲取(中斷)。

下面我們從“何時出隊列?”和“如何出隊列?”兩個方向來分析一下acquireQueued源碼:

//?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{//?標記是否成功拿到資源boolean?failed?=?true;try?{//?標記等待過程中是否中斷過boolean?interrupted?=?false;//?開始自旋,要么獲取鎖,要么中斷for?(;;)?{//?獲取當前節點的前驅節點final?Node?p?=?node.predecessor();//?如果p是頭結點,說明當前節點在真實數據隊列的首部,就嘗試獲取鎖(別忘了頭結點是虛節點)if?(p?==?head?&&?tryAcquire(arg))?{//?獲取鎖成功,頭指針移動到當前nodesetHead(node);p.next?=?null;?//?help?GCfailed?=?false;return?interrupted;}//?說明p為頭節點且當前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus為-1),防止無限循環浪費資源。具體兩個方法下面細細分析if?(shouldParkAfterFailedAcquire(p,?node)?&&?parkAndCheckInterrupt())interrupted?=?true;}}?finally?{if?(failed)cancelAcquire(node);} }

注:setHead方法是把當前節點置為虛節點,但并沒有修改waitStatus,因為它是一直需要用的數據。

//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?setHead(Node?node)?{head?=?node;node.thread?=?null;node.prev?=?null; }

?

//?java.util.concurrent.locks.AbstractQueuedSynchronizer//?靠前驅節點判斷當前線程是否應該被阻塞 private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{//?獲取頭結點的節點狀態int?ws?=?pred.waitStatus;//?說明頭結點處于喚醒狀態if?(ws?==?Node.SIGNAL)return?true;?//?通過枚舉值我們知道waitStatus>0是取消狀態if?(ws?>?0)?{do?{//?循環向前查找取消節點,把取消節點從隊列中剔除node.prev?=?pred?=?pred.prev;}?while?(pred.waitStatus?>?0);pred.next?=?node;}?else?{//?設置前任節點等待狀態為SIGNALcompareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);}return?false; }

parkAndCheckInterrupt主要用于掛起當前線程,阻塞調用棧,返回當前線程的中斷狀態。

//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?final?boolean?parkAndCheckInterrupt()?{LockSupport.park(this);return?Thread.interrupted(); }

上述方法的流程圖如下:

從上圖可以看出,跳出當前循環的條件是當“前置節點是頭結點,且當前線程獲取鎖成功”。為了防止因死循環導致CPU資源被浪費,我們會判斷前置節點的狀態來決定是否要將當前線程掛起,具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):

從隊列中釋放節點的疑慮打消了,那么又有新問題了:

  • shouldParkAfterFailedAcquire中取消節點是怎么生成的呢?什么時候會把一個節點的waitStatus設置為-1?

  • 是在什么時間釋放節點通知到被掛起的線程呢?

2.3.2 CANCELLED狀態節點生成

acquireQueued方法中的Finally代碼:

//?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{boolean?failed?=?true;try?{...for?(;;)?{final?Node?p?=?node.predecessor();if?(p?==?head?&&?tryAcquire(arg))?{...failed?=?false;...}...}?finally?{if?(failed)cancelAcquire(node);} }

通過cancelAcquire方法,將Node的狀態標記為CANCELLED。接下來,我們逐行來分析這個方法的原理:

//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?cancelAcquire(Node?node)?{//?將無效節點過濾if?(node?==?null)return;//?設置該節點不關聯任何線程,也就是虛節點node.thread?=?null;Node?pred?=?node.prev;//?通過前驅節點,跳過取消狀態的nodewhile?(pred.waitStatus?>?0)node.prev?=?pred?=?pred.prev;//?獲取過濾后的前驅節點的后繼節點Node?predNext?=?pred.next;//?把當前node的狀態設置為CANCELLEDnode.waitStatus?=?Node.CANCELLED;//?如果當前節點是尾節點,將從后往前的第一個非取消狀態的節點設置為尾節點//?更新失敗的話,則進入else,如果更新成功,將tail的后繼節點設置為nullif?(node?==?tail?&&?compareAndSetTail(node,?pred))?{compareAndSetNext(pred,?predNext,?null);}?else?{int?ws;//?如果當前節點不是head的后繼節點,1:判斷當前節點前驅節點的是否為SIGNAL,2:如果不是,則把前驅節點設置為SINGAL看是否成功//?如果1和2中有一個為true,再判斷當前節點的線程是否為null//?如果上述條件都滿足,把當前節點的前驅節點的后繼指針指向當前節點的后繼節點if?(pred?!=?head?&&?((ws?=?pred.waitStatus)?==?Node.SIGNAL?||?(ws?<=?0?&&?compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL)))?&&?pred.thread?!=?null)?{Node?next?=?node.next;if?(next?!=?null?&&?next.waitStatus?<=?0)compareAndSetNext(pred,?predNext,?next);}?else?{//?如果當前節點是head的后繼節點,或者上述條件不滿足,那就喚醒當前節點的后繼節點unparkSuccessor(node);}node.next?=?node;?//?help?GC} }

當前的流程:

  • 獲取當前節點的前驅節點,如果前驅節點的狀態是CANCELLED,那就一直往前遍歷,找到第一個waitStatus <= 0的節點,將找到的Pred節點和當前Node關聯,將當前Node設置為CANCELLED。

  • 根據當前節點的位置,考慮以下三種情況:

  • 當前節點是尾節點。

  • 當前節點是Head的后繼節點。

  • 當前節點不是Head的后繼節點,也不是尾節點。

  • 根據上述第二條,我們來分析每一種情況的流程。

    當前節點是尾節點。

    當前節點是Head的后繼節點。

    當前節點不是Head的后繼節點,也不是尾節點。

    通過上面的流程,我們對于CANCELLED節點狀態的產生和變化已經有了大致的了解,但是為什么所有的變化都是對Next指針進行了操作,而沒有對Prev指針進行操作呢?什么情況下會對Prev指針進行操作?

    (1)執行cancelAcquire的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),如果此時修改Prev指針,有可能會導致Prev指向另一個已經移除隊列的Node,因此這塊變化Prev指針不安全。


    (2)shouldParkAfterFailedAcquire方法中,會執行下面的代碼,其實就是在處理Prev指針。shouldParkAfterFailedAcquire是獲取鎖失敗的情況下才會執行,進入該方法后,說明共享資源已被獲取,當前節點之前的節點都不會出現變化,因此這個時候變更Prev指針比較安全。

    ?

    do?{node.prev?=?pred?=?pred.prev; }?while?(pred.waitStatus?>?0);

    2.3.3 如何解鎖

    我們已經剖析了加鎖過程中的基本流程,接下來再對解鎖的基本流程進行分析。由于ReentrantLock在解鎖的時候,并不區分公平鎖和非公平鎖,所以我們直接看解鎖的源碼:

    //?java.util.concurrent.locks.ReentrantLockpublic?void?unlock()?{sync.release(1); }

    可以看到,本質釋放鎖的地方,是通過框架來完成的。

    //?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?boolean?release(int?arg)?{if?(tryRelease(arg))?{Node?h?=?head;if?(h?!=?null?&&?h.waitStatus?!=?0)unparkSuccessor(h);return?true;}return?false; }

    在ReentrantLock里面的公平鎖和非公平鎖的父類Sync定義了可重入鎖的釋放鎖機制。

    //?java.util.concurrent.locks.ReentrantLock.Sync//?方法返回當前鎖是不是沒有被線程持有 protected?final?boolean?tryRelease(int?releases)?{//?減少可重入次數int?c?=?getState()?-?releases;//?當前線程不是持有鎖的線程,拋出異常if?(Thread.currentThread()?!=?getExclusiveOwnerThread())throw?new?IllegalMonitorStateException();boolean?free?=?false;//?如果持有線程全部釋放,將當前獨占鎖所有線程設置為null,并更新stateif?(c?==?0)?{free?=?true;setExclusiveOwnerThread(null);}setState(c);return?free; }

    我們來解釋下述源碼:

    //?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?boolean?release(int?arg)?{//?上邊自定義的tryRelease如果返回true,說明該鎖沒有被任何線程持有if?(tryRelease(arg))?{//?獲取頭結點Node?h?=?head;//?頭結點不為空并且頭結點的waitStatus不是初始化節點情況,解除線程掛起狀態if?(h?!=?null?&&?h.waitStatus?!=?0)unparkSuccessor(h);return?true;}return?false; }

    這里的判斷條件為什么是h != null && h.waitStatus != 0?

    (1)h == null ? Head還沒初始化。初始情況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。所以說,這里如果還沒來得及入隊,就會出現head == null 的情況。
    (2)h != null && waitStatus == 0 ? 表明后繼節點對應的線程仍在運行中,不需要喚醒。
    (3)h != null && waitStatus < 0 ?表明后繼節點可能被阻塞了,需要喚醒。

    在看一下unparkSuccessor方法:

    //?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?unparkSuccessor(Node?node)?{//?獲取頭結點waitStatusint?ws?=?node.waitStatus;if?(ws?<?0)compareAndSetWaitStatus(node,?ws,?0);//?獲取當前節點的下一個節點Node?s?=?node.next;//?如果下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點if?(s?==?null?||?s.waitStatus?>?0)?{s?=?null;//?就從尾部節點開始找,到隊首,找到隊列第一個waitStatus<0的節點。for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)if?(t.waitStatus?<=?0)s?=?t;}//?如果當前節點的下個節點不為空,而且狀態<=0,就把當前節點unparkif?(s?!=?null)LockSupport.unpark(s.thread); }

    為什么要從后往前找第一個非Cancelled的節點呢?原因如下。

    之前的addWaiter方法:

    //?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?addWaiter(Node?mode)?{Node?node?=?new?Node(Thread.currentThread(),?mode);//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failureNode?pred?=?tail;if?(pred?!=?null)?{node.prev?=?pred;if?(compareAndSetTail(pred,?node))?{pred.next?=?node;return?node;}}enq(node);return?node; }

    我們從這里可以看到,節點入隊并不是原子操作,也就是說,node.prev = pred; ?compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往后找了,所以需要從后往前找。還有一點原因,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針并未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node。

    綜上所述,如果是從前往后找,由于極端情況下入隊的非原子操作和CANCELLED節點產生過程中斷開Next指針的操作,可能會導致無法遍歷所有的節點。所以,喚醒對應的線程后,對應的線程就會繼續往下執行。繼續執行acquireQueued方法以后,中斷如何處理?

    2.3.4 中斷恢復后的執行流程

    喚醒后,會執行return Thread.interrupted();,這個函數返回的是當前執行線程的中斷狀態,并清除。

    //?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?final?boolean?parkAndCheckInterrupt()?{LockSupport.park(this);return?Thread.interrupted(); }

    再回到acquireQueued代碼,當parkAndCheckInterrupt返回True或者False的時候,interrupted的值不同,但都會執行下次循環。如果這個時候獲取鎖成功,就會把當前interrupted返回。

    //?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{boolean?failed?=?true;try?{boolean?interrupted?=?false;for?(;;)?{final?Node?p?=?node.predecessor();if?(p?==?head?&&?tryAcquire(arg))?{setHead(node);p.next?=?null;?//?help?GCfailed?=?false;return?interrupted;}if?(shouldParkAfterFailedAcquire(p,?node)?&&?parkAndCheckInterrupt())interrupted?=?true;}}?finally?{if?(failed)cancelAcquire(node);} }

    如果acquireQueued為True,就會執行selfInterrupt方法。

    //?java.util.concurrent.locks.AbstractQueuedSynchronizerstatic?void?selfInterrupt()?{Thread.currentThread().interrupt(); }

    該方法其實是為了中斷線程。但為什么獲取了鎖以后還要中斷線程呢?這部分屬于Java提供的協作式中斷知識內容,感興趣同學可以查閱一下。這里簡單介紹一下:

    (1)?當中斷線程被喚醒時,并不知道被喚醒的原因,可能是當前線程在等待中被中斷,也可能是釋放了鎖以后被喚醒。因此我們通過Thread.interrupted()方法檢查中斷標記(該方法返回了當前線程的中斷狀態,并將當前線程的中斷標識設置為False),并記錄下來,如果發現該線程被中斷過,就再中斷一次。

    (2)?線程在等待資源的過程中被喚醒,喚醒后還是會不斷地去嘗試獲取鎖,直到搶到鎖為止。也就是說,在整個流程中,并不響應中斷,只是記錄中斷記錄。最后搶到鎖返回了,那么如果被中斷過的話,就需要補充一次中斷。

    這里的處理方式主要是運用線程池中基本運作單元Worder中的runWorker,通過Thread.interrupted()進行額外的判斷處理,感興趣的同學可以看下ThreadPoolExecutor源碼。

    2.3.5 小結

    我們在1.3小節中提出了一些問題,現在來回答一下。

    Q:某個線程獲取鎖失敗的后續流程是什么呢?
    A:存在某種排隊等候機制,線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續。


    Q:既然說到了排隊等候機制,那么就一定會有某種隊列形成,這樣的隊列是什么數據結構呢?
    A:是CLH變體的FIFO雙端隊列。


    Q:處于排隊等候機制中的線程,什么時候可以有機會獲取鎖呢?
    A:可以詳細看下2.3.1.3小節。


    Q:如果處于排隊等候機制中的線程一直無法獲取鎖,需要一直等待么?還是有別的策略來解決這一問題?
    A:線程所在節點的狀態會變成取消狀態,取消狀態的節點會從隊列中釋放,具體可見2.3.2小節。


    Q:Lock函數通過Acquire方法進行加鎖,但是具體是如何加鎖的呢?
    A:AQS的Acquire會調用tryAcquire方法,tryAcquire由各個自定義同步器實現,通過tryAcquire完成加鎖過程。

    3 AQS應用

    3.1 ReentrantLock的可重入應用

    ReentrantLock的可重入性是AQS很好的應用之一,在了解完上述知識點以后,我們很容易得知ReentrantLock實現可重入的方法。在ReentrantLock里面,不管是公平鎖還是非公平鎖,都有一段邏輯。

    公平鎖:

    //?java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquireif?(c?==?0)?{if?(!hasQueuedPredecessors()?&&?compareAndSetState(0,?acquires))?{setExclusiveOwnerThread(current);return?true;} } else?if?(current?==?getExclusiveOwnerThread())?{int?nextc?=?c?+?acquires;if?(nextc?<?0)throw?new?Error("Maximum?lock?count?exceeded");setState(nextc);return?true; }

    非公平鎖:

    //?java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquireif?(c?==?0)?{if?(compareAndSetState(0,?acquires)){setExclusiveOwnerThread(current);return?true;} } else?if?(current?==?getExclusiveOwnerThread())?{int?nextc?=?c?+?acquires;if?(nextc?<?0)?//?overflowthrow?new?Error("Maximum?lock?count?exceeded");setState(nextc);return?true; }

    從上面這兩段都可以看到,有一個同步狀態State來控制整體可重入的情況。State是Volatile修飾的,用于保證一定的可見性和有序性。

    //?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?volatile?int?state;

    接下來看State這個字段主要的過程:

    (1) State初始化的時候為0,表示沒有任何線程持有鎖。

    (2)?當有線程持有該鎖時,值就會在原來的基礎上+1,同一個線程多次獲得鎖是,就會多次+1,這里就是可重入的概念。

    (3)?解鎖也是對這個字段-1,一直到0,此線程對鎖釋放。

    3.2 JUC中的應用場景

    除了上邊ReentrantLock的可重入性的應用,AQS作為并發編程的框架,為很多其他同步工具提供了良好的解決方案。下面列出了JUC中的幾種同步工具,大體介紹一下AQS的應用場景:

    同步工具

    同步工具與AQS的關聯

    ReentrantLock

    使用AQS保存鎖重復持有的次數。當一個線程獲取鎖時,ReentrantLock記錄當前獲得鎖的線程標識,用于檢測是否重復獲取,以及錯誤線程試圖解鎖操作時異常情況的處理。

    ReentrantReadWriteLock

    使用AQS同步狀態中的16位保存寫鎖持有的次數,剩下的16位用于保存讀鎖的持有次數。

    Semaphore

    使用AQS同步狀態來保存信號量的當前計數。tryRelease會增加計數,acquireShared會減少計數。

    CountDownLatch

    使用AQS同步狀態來表示計數。計數為0時,所有的Acquire操作(CountDownLatch的await方法)才可以通過。

    ThreadPoolExecutor

    Worker利用AQS同步狀態實現對獨占線程變量的設置(tryAcquire和tryRelease)。

    3.3 自定義同步工具

    了解AQS基本原理以后,按照上面所說的AQS知識點,自己實現一個同步工具。

    public?class?LeeLock??{private?static?class?Sync?extends?AbstractQueuedSynchronizer?{@Overrideprotected?boolean?tryAcquire?(int?arg)?{return?compareAndSetState(0,?1);}@Overrideprotected?boolean?tryRelease?(int?arg)?{setState(0);return?true;}@Overrideprotected?boolean?isHeldExclusively?()?{return?getState()?==?1;}}private?Sync?sync?=?new?Sync();public?void?lock?()?{sync.acquire(1);}public?void?unlock?()?{sync.release(1);} }

    通過我們自己定義的Lock完成一定的同步功能。

    public?class?LeeMain?{static?int?count?=?0;static?LeeLock?leeLock?=?new?LeeLock();public?static?void?main?(String[]?args)?throws?InterruptedException?{Runnable?runnable?=?new?Runnable()?{@Overridepublic?void?run?()?{try?{leeLock.lock();for?(int?i?=?0;?i?<?10000;?i++)?{count++;}}?catch?(Exception?e)?{e.printStackTrace();}?finally?{leeLock.unlock();}}};Thread?thread1?=?new?Thread(runnable);Thread?thread2?=?new?Thread(runnable);thread1.start();thread2.start();thread1.join();thread2.join();System.out.println(count);} }

    上述代碼每次運行結果都會是20000。通過簡單的幾行代碼就能實現同步功能,這就是AQS的強大之處。

    總結

    我們日常開發中使用并發的場景太多,但是對并發內部的基本框架原理了解的人卻不多。由于篇幅原因,本文僅介紹了可重入鎖ReentrantLock的原理和AQS原理,希望能夠成為大家了解AQS和ReentrantLock等同步器的“敲門磚”。

    參考資料

    • Lea D. The java. util. concurrent synchronizer framework[J]. Science of Computer Programming, 2005, 58(3): 293-309.

    • 《Java并發編程實戰》

    • 不可不說的Java“鎖”事

    總結

    以上是生活随笔為你收集整理的AQS的原理及应用的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。