多线程:AQS源码分析
AQS 源碼分析
?
概述
Java的內(nèi)置鎖一直都是備受爭(zhēng)議的,在JDK 1.6之前,synchronized這個(gè)重量級(jí)鎖其性能一直都是較為低下,雖然在1.6后,進(jìn)行大量的鎖優(yōu)化策略,但是與Lock相比synchronized還是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機(jī)制(基于JVM機(jī)制),但是它卻缺少了獲取鎖與釋放鎖的可操作性,可中斷、超時(shí)獲取鎖,且它為獨(dú)占式在高并發(fā)場(chǎng)景下性能大打折扣。
AQS,AbstractQueuedSynchronizer,即隊(duì)列同步器。它是構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并發(fā)包的作者(Doug Lea)期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)。它是JUC并發(fā)包中的核心基礎(chǔ)組件。
AQS解決了子類實(shí)現(xiàn)同步器時(shí)涉及當(dāng)?shù)拇罅考?xì)節(jié)問(wèn)題,例如獲取同步狀態(tài)、FIFO同步隊(duì)列。基于AQS來(lái)構(gòu)建同步器可以帶來(lái)很多好處。它不僅能夠極大地減少實(shí)現(xiàn)工作,而且也不必處理在多個(gè)位置上發(fā)生的競(jìng)爭(zhēng)問(wèn)題。
AQS的主要使用方式是繼承,子類通過(guò)繼承同步器并實(shí)現(xiàn)它的抽象方法來(lái)管理同步狀態(tài)。
AQS使用一個(gè)int類型的成員變量state來(lái)表示同步狀態(tài),當(dāng)state>0時(shí)表示已經(jīng)獲取了鎖,當(dāng)state = 0時(shí)表示釋放了鎖。它提供了三個(gè)方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來(lái)對(duì)同步狀態(tài)state進(jìn)行操作,當(dāng)然AQS可以確保對(duì)state的操作是安全的。
AQS通過(guò)內(nèi)置的FIFO同步隊(duì)列來(lái)完成資源獲取線程的排隊(duì)工作,如果當(dāng)前線程獲取同步狀態(tài)失敗(鎖)時(shí),AQS則會(huì)將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成一個(gè)節(jié)點(diǎn)(Node)并將其加入同步隊(duì)列,同時(shí)會(huì)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),則會(huì)把節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
AQS可以實(shí)現(xiàn)獨(dú)占鎖和共享鎖,RenntrantLock實(shí)現(xiàn)的是獨(dú)占鎖,ReentrantReadWriteLock實(shí)現(xiàn)的是獨(dú)占鎖和共享鎖,CountDownLatch實(shí)現(xiàn)的是共享鎖。
下面我們通過(guò)源碼來(lái)分析下AQS的實(shí)現(xiàn)原理
AbstractQueuedSynchronizer類結(jié)構(gòu)
public?abstract?class AbstractQueuedSynchronizer
????extends AbstractOwnableSynchronizer
????implements java.io.Serializable {
????protected AbstractQueuedSynchronizer() { }
????//同步器隊(duì)列頭結(jié)點(diǎn)
????private?transient?volatile?Node head;
????//同步器隊(duì)列尾結(jié)點(diǎn)
????private?transient?volatile?Node tail;
????//同步狀態(tài)(打的那個(gè)state為0時(shí),無(wú)鎖,當(dāng)state>0時(shí)說(shuō)明有鎖。)
????private?volatile?int?state;
????//獲取鎖狀態(tài)
????protected final int getState() {
????????return?state;
????}
????//設(shè)置鎖狀態(tài)
????protected final void setState(int newState) {
????????state = newState;
????}
????......
通過(guò)AQS的類結(jié)構(gòu)我們可以看到它內(nèi)部有一個(gè)隊(duì)列和一個(gè)state的int變量。
隊(duì)列:通過(guò)一個(gè)雙向鏈表實(shí)現(xiàn)的隊(duì)列來(lái)存儲(chǔ)等待獲取鎖的線程。
state:鎖的狀態(tài)。
head、tail和state 都是volatile類型的變量,volatile可以保證多線程的內(nèi)存可見性。
同步隊(duì)列的基本結(jié)構(gòu)如下:
?
同步隊(duì)列
同步器隊(duì)列Node元素的類結(jié)構(gòu)如下:
static?final?class Node {
????static?final?Node SHARED = new?Node();
????static?final?Node EXCLUSIVE = null;
????//表示當(dāng)前的線程被取消;
????static?final?int?CANCELLED = ?1;
????//表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行,也就是unpark;
????static?final?int?SIGNAL ???= -1;
????//表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中;
????static?final?int?CONDITION = -2;
????//表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行;
????static?final?int?PROPAGATE = -3;
????//表示節(jié)點(diǎn)的狀態(tài)。默認(rèn)為0,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖。
????//其它幾個(gè)狀態(tài)為:CANCELLED、SIGNAL、CONDITION、PROPAGATE
????volatile?int?waitStatus;
????//前驅(qū)節(jié)點(diǎn)
????volatile?Node prev;
????//后繼節(jié)點(diǎn)
????volatile?Node next;
????//獲取鎖的線程
????volatile?Thread thread;
????//存儲(chǔ)condition隊(duì)列中的后繼節(jié)點(diǎn)。
????Node nextWaiter;
????......
}
從Node結(jié)構(gòu)prev和next節(jié)點(diǎn)可以看出它是一個(gè)雙向鏈表,waitStatus存儲(chǔ)了當(dāng)前線程的狀態(tài)信息
waitStatus
下面我們通過(guò)以下五個(gè)方面來(lái)介紹AQS是怎么實(shí)現(xiàn)的鎖的獲取和釋放的
5.獨(dú)占超時(shí)獲得鎖
1.獨(dú)占式獲得鎖
acquire方法代碼如下:
public final void acquire(int arg) {
????????//嘗試獲得鎖,獲取不到則加入到隊(duì)列中等待獲取
????????if?(!tryAcquire(arg) &&
????????????acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
????????????selfInterrupt();
????}
addWaiter方法代碼如下:
private Node addWaiter(Node mode) {
????Node node = new?Node(Thread.currentThread(), mode);
????// Try the fast path of enq; backup to full enq on failure
????Node pred = tail;
????if?(pred != null) {
????????node.prev = pred;
????????//將該節(jié)點(diǎn)添加到隊(duì)列尾部
????????if?(compareAndSetTail(pred, node)) {
????????????pred.next = node;
????????????return?node;
????????}
????}
????//如果前驅(qū)節(jié)點(diǎn)為null,則進(jìn)入enq方法通過(guò)自旋方式入隊(duì)列
????enq(node);
????return?node;
}
將構(gòu)造的同步節(jié)點(diǎn)加入到同步隊(duì)列中
enq方法代碼如下:
????private Node enq(final Node node) {
????????for?(;;) {
????????????Node t = tail;
????????????if?(t == null) { // Must initialize
????????????????//如果隊(duì)列為空,則通過(guò)CAS把當(dāng)前Node設(shè)置成頭節(jié)點(diǎn)
????????????????if?(compareAndSetHead(new?Node()))
????????????????????tail = head;
????????????} else?{
????????????????node.prev = t;
????????????????//如果隊(duì)列不為空,則向隊(duì)列尾部添加Node
????????????????if?(compareAndSetTail(t, node)) {
????????????????????t.next = node;
????????????????????return?t;
????????????????}
????????????}
????????}
????}
該方法使用CAS自旋的方式來(lái)保證向隊(duì)列中添加Node(同步節(jié)點(diǎn)簡(jiǎn)寫Node)
acquireQueued方法代碼如下:
final boolean acquireQueued(final Node node, int arg) { ?
????boolean?failed = true; ?
????try?{ ?
????????boolean?interrupted = false; ?
????????for?(;;) { ?
????????????//找到當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
????????????final?Node p = node.predecessor(); ?
????????????//檢測(cè)p是否為頭節(jié)點(diǎn),如果是,再次調(diào)用tryAcquire方法
????????????if?(p == head && tryAcquire(arg)) { ?
????????????????//如果p節(jié)點(diǎn)是頭節(jié)點(diǎn)且tryAcquire方法返回true。那么將當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)。
????????????????setHead(node); ?
????????????????p.next = null; // help GC ?
????????????????failed = false; ?
????????????????return?interrupted; ?
????????????} ?
????????????//如果p節(jié)點(diǎn)不是頭節(jié)點(diǎn),或者tryAcquire返回false,說(shuō)明請(qǐng)求失敗。 ?
????????????//那么首先需要判斷請(qǐng)求失敗后node節(jié)點(diǎn)是否應(yīng)該被阻塞,如果應(yīng)該 ?
????????????//被阻塞,那么阻塞node節(jié)點(diǎn),并檢測(cè)中斷狀態(tài)。 ?
????????????if?(shouldParkAfterFailedAcquire(p, node) && ?
????????????????parkAndCheckInterrupt()) ?
????????????????//如果有中斷,設(shè)置中斷狀態(tài)。 ?
????????????????interrupted = true; ?
????????} ?
????} finally?{ ?
????????if?(failed) //最后檢測(cè)一下如果請(qǐng)求失敗(異常退出),取消請(qǐng)求。 ?
????????????cancelAcquire(node); ?
????} ?
}
在acquireQueued方法中,當(dāng)前線程通過(guò)自旋的方式來(lái)嘗試獲取同步狀態(tài),
通過(guò)上面的代碼我們可以發(fā)現(xiàn)AQS內(nèi)部的同步隊(duì)列是FIFO的方式存取的。節(jié)點(diǎn)自旋獲取同步狀態(tài)的行為如下圖所示
?
節(jié)點(diǎn)自旋獲取同步狀態(tài)
shouldParkAfterFailedAcquire方法代碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
????????//獲得前驅(qū)節(jié)點(diǎn)狀態(tài)
????????int?ws = pred.waitStatus;
????????if?(ws == Node.SIGNAL)
???????????//如果前驅(qū)節(jié)點(diǎn)狀態(tài)為SIGNAL,當(dāng)前線程則可以阻塞。
???????????return?true;
????????if?(ws > 0) {
????????????do?{
????????????????//判斷如果前驅(qū)節(jié)點(diǎn)狀態(tài)為CANCELLED,那就一直往前找,直到找到最近一個(gè)正常等待的狀態(tài)
????????????????node.prev = pred = pred.prev;
????????????} while?(pred.waitStatus > 0);
????????????//并將當(dāng)前Node排在它的后邊。
????????????pred.next = node;
????????} else?{
????????????//如果前驅(qū)節(jié)點(diǎn)正常,則修改前驅(qū)節(jié)點(diǎn)狀態(tài)為SIGNAL
????????????compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
????????}
????????return?false;
????}
節(jié)點(diǎn)的狀態(tài)如下表:
| 狀態(tài) | 值 | 說(shuō)明 |
| CANCELLED | 1 | 等待超時(shí)或者中斷,需要從同步隊(duì)列中取消 |
| SIGNAL | -1 | 后繼節(jié)點(diǎn)出于等待狀態(tài),當(dāng)前節(jié)點(diǎn)釋放鎖后將會(huì)喚醒后繼節(jié)點(diǎn) |
| CONDITION | -2 | 節(jié)點(diǎn)在等待隊(duì)列中,節(jié)點(diǎn)線程等待在Condition上,其它線程對(duì)Condition調(diào)用signal()方法后,該節(jié)點(diǎn)將會(huì)從等待同步隊(duì)列中移到同步隊(duì)列中,然后等待獲取鎖。 |
| PROPAGATE | -3 | 表示下一次共享式同步狀態(tài)獲取將會(huì)無(wú)條件地傳播下去 |
| INITIAL | 0 | 初始狀態(tài) |
parkAndCheckInterrupt方法代碼如下:
private final boolean parkAndCheckInterrupt() {
????//阻塞當(dāng)前線程
????LockSupport.park(this);
????//判斷是否中斷來(lái)喚醒的
????return?Thread.interrupted();
}
2. 獨(dú)占式釋放鎖
release方法代碼如下:
????public final boolean release(int arg) {
????????//嘗試釋放鎖
????????if?(tryRelease(arg)) {
????????????Node h = head;
????????????if?(h != null?&& h.waitStatus != 0)
????????????????//喚醒后繼節(jié)點(diǎn)
????????????????unparkSuccessor(h);
????????????return?true;
????????}
????????return?false;
????}
tryRelease(int arg) 方法應(yīng)該由實(shí)現(xiàn)AQS的子類來(lái)實(shí)現(xiàn)具體的邏輯。
3. 共享式獲得鎖
acquireShared方法代碼如下:
public final void acquireShared(int arg) {
????//嘗試獲取的鎖,如果獲取失敗執(zhí)行doAcquireShared方法。
????if?(tryAcquireShared(arg) < 0)
????????doAcquireShared(arg);
}
tryAcquireShared()嘗試獲取鎖,如果獲取失敗則通過(guò)doAcquireShared()進(jìn)入等待隊(duì)列,直到獲取到資源為止才返回。
這里tryAcquireShared()需要自定義同步器去實(shí)現(xiàn)。
AQS中規(guī)定:負(fù)值代表獲取失敗,非負(fù)數(shù)標(biāo)識(shí)獲取成功。
doAcquireShared方法代碼如下:
private void doAcquireShared(int arg) {
????//構(gòu)建共享Node
????final?Node node = addWaiter(Node.SHARED);
????boolean?failed = true;
????try?{
????????boolean?interrupted = false;
????????for?(;;) {
????????????//獲取前驅(qū)節(jié)點(diǎn)
????????????final?Node p = node.predecessor();
????????????//如果是頭節(jié)點(diǎn)進(jìn)行嘗試獲得鎖
????????????if?(p == head) {
????????????????//如果返回值大于等于0,則說(shuō)明獲得鎖
????????????????int?r = tryAcquireShared(arg);
????????????????if?(r >= 0) {
????????????????????//當(dāng)前節(jié)點(diǎn)設(shè)置為隊(duì)列頭,并
????????????????????setHeadAndPropagate(node, r);
????????????????????p.next = null; // help GC
????????????????????if?(interrupted)
????????????????????????selfInterrupt();
????????????????????failed = false;
????????????????????return;
????????????????}
????????????}
????????????if?(shouldParkAfterFailedAcquire(p, node) &&
????????????????parkAndCheckInterrupt())
????????????????interrupted = true;
????????}
????} finally?{
????????if?(failed)
????????????cancelAcquire(node);
????}
}
在acquireQueued方法中,當(dāng)前線程也通過(guò)自旋的方式來(lái)嘗試獲取同步狀態(tài),同獨(dú)享式獲得鎖一樣
setHeadAndPropagate方法代碼如下:
private void setHeadAndPropagate(Node node, int propagate) {
????????Node h = head; // Record old head for check below
????????setHead(node);
????????//如果propagate >0,說(shuō)明共享鎖還有可以進(jìn)行獲得鎖,繼續(xù)喚醒下一個(gè)節(jié)點(diǎn)
????????if?(propagate > 0?|| h == null?|| h.waitStatus < 0?||
????????????(h = head) == null?|| h.waitStatus < 0) {
????????????Node s = node.next;
????????????if?(s == null?|| s.isShared())
????????????????doReleaseShared();
????????}
????}
設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn),并調(diào)用了doReleaseShared()方法,acquireShared方法最終調(diào)用了release方法,得看下為什么。原因其實(shí)也很簡(jiǎn)單,shared模式下是允許多個(gè)線程持有一把鎖的,其中tryAcquire的返回值標(biāo)志了是否允許其他線程繼續(xù)進(jìn)入。如果允許的話,需要喚醒隊(duì)列中等待的線程。其中doReleaseShared方法的邏輯很簡(jiǎn)單,就是喚醒后繼線程。
因此acquireShared的主要邏輯就是嘗試加鎖,如果允許其他線程繼續(xù)加鎖,那么喚醒后繼線程,如果失敗,那么入隊(duì)阻塞等待。
4. 共享式釋放鎖
releaseShared方法代碼如下:
public final boolean releaseShared(int arg) {
????if?(tryReleaseShared(arg)) {
????????doReleaseShared();
????????return?true;
????}
????return?false;
}
tryReleaseShared(int arg) 方法應(yīng)該由實(shí)現(xiàn)AQS的子類來(lái)實(shí)現(xiàn)具體的邏輯。
doReleaseShared方法代碼如下:
private void doReleaseShared() {
????for?(;;) {
????????// 獲取隊(duì)列的頭節(jié)點(diǎn)
????????Node h = head;
????????// 如果頭節(jié)點(diǎn)不為null,并且頭節(jié)點(diǎn)不等于tail節(jié)點(diǎn)。
????????if?(h != null?&& h != tail) {
????????????// 獲取頭節(jié)點(diǎn)對(duì)應(yīng)的線程的狀態(tài)
????????????int?ws = h.waitStatus;
????????????// 如果頭節(jié)點(diǎn)對(duì)應(yīng)的線程是SIGNAL狀態(tài),則意味著“頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)所對(duì)應(yīng)的線程”需要被unpark喚醒。
????????????if?(ws == Node.SIGNAL) {
????????????????// 設(shè)置“頭節(jié)點(diǎn)對(duì)應(yīng)的線程狀態(tài)”為空狀態(tài)。失敗的話,則繼續(xù)循環(huán)。
????????????????if?(!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
????????????????????continue;
????????????????// 喚醒“頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)所對(duì)應(yīng)的線程”。
????????????????unparkSuccessor(h);
????????????}
????????????// 如果頭節(jié)點(diǎn)對(duì)應(yīng)的線程是空狀態(tài),則設(shè)置“尾節(jié)點(diǎn)對(duì)應(yīng)的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態(tài)。
????????????else?if?(ws == 0?&&
?????????????????????!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
????????????????continue; ???????????????// loop on failed CAS
????????}
????????// 如果頭節(jié)點(diǎn)發(fā)生變化,則繼續(xù)循環(huán)。否則,退出循環(huán)。
????????if?(h == head) ??????????????????// loop if head changed
????????????break;
????}
}
該方法主要是喚醒后繼節(jié)點(diǎn)。對(duì)于能夠支持多個(gè)線程同時(shí)訪問(wèn)的并發(fā)組件(比如Semaphore),它和獨(dú)占式主要區(qū)別在于tryReleaseShared(int arg)方法必須確保同步狀態(tài)(或者資源數(shù))線程安全釋放,一般是通過(guò)循環(huán)和CAS來(lái)保證的,因?yàn)獒尫磐綘顟B(tài)的操作會(huì)同時(shí)來(lái)自多個(gè)線程。
5. 獨(dú)占超時(shí)獲得鎖
doAcquireNanos方法代碼如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
????????throws InterruptedException {
????if?(nanosTimeout <= 0L)
????????return?false;
????//計(jì)算出超時(shí)時(shí)間點(diǎn)
????final?long?deadline = System.nanoTime() + nanosTimeout;
????final?Node node = addWaiter(Node.EXCLUSIVE);
????boolean?failed = true;
????try?{
????????for?(;;) {
????????????final?Node p = node.predecessor();
????????????if?(p == head && tryAcquire(arg)) {
????????????????setHead(node);
????????????????p.next = null; // help GC
????????????????failed = false;
????????????????return?true;
????????????}
????????????//計(jì)算剩余超時(shí)時(shí)間,超時(shí)時(shí)間點(diǎn)deadline減去當(dāng)前時(shí)間點(diǎn)System.nanoTime()得到還應(yīng)該睡眠的時(shí)間
????????????nanosTimeout = deadline - System.nanoTime();
????????????//如果超時(shí),返回false,獲取鎖失敗
????????????if?(nanosTimeout <= 0L)
????????????????return?false;
????????????//判斷是否需要阻塞當(dāng)前線程
????????????//如果需要,在判斷當(dāng)前剩余納秒數(shù)是否大于1000
????????????if?(shouldParkAfterFailedAcquire(p, node) &&
????????????????nanosTimeout > spinForTimeoutThreshold)
????????????????//阻塞 nanosTimeout納秒數(shù)
????????????????LockSupport.parkNanos(this, nanosTimeout);
????????????if?(Thread.interrupted())
????????????????throw?new?InterruptedException();
????????}
????} finally?{
????????if?(failed)
????????????cancelAcquire(node);
????}
}
該方法在自旋過(guò)程中,當(dāng)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)時(shí)嘗試獲取同步狀態(tài),如果獲取成功則從該方法返回,這個(gè)過(guò)程和獨(dú)占式同步獲取的過(guò)程類似,但是在同步狀態(tài)獲取失敗的處理上有所不同。如果當(dāng)前線程獲取同步狀態(tài)失敗,則首先重新計(jì)算超時(shí)間隔nanosTimeout,則判斷是否超時(shí)(nanosTimeout小于等于0表示已經(jīng)超時(shí)),如果沒有超時(shí),則使當(dāng)前線程等待nanosTimeout納秒(當(dāng)已到設(shè)置的超時(shí)時(shí)間,該線程會(huì)從LockSupport.parkNanos(Object blocker,long nanos)方法返回)。
如果nanosTimeout小于等于spinForTimeoutThreshold(1000納秒)時(shí),將不會(huì)使該線程進(jìn)行
超時(shí)等待,而是進(jìn)入快速的自旋過(guò)程。原因在于,非常短的超時(shí)等待無(wú)法做到十分精確,如果
這時(shí)再進(jìn)行超時(shí)等待,相反會(huì)讓nanosTimeout的超時(shí)從整體上表現(xiàn)得反而不精確。因此,在超
時(shí)非常短的場(chǎng)景下,同步器會(huì)進(jìn)入無(wú)條件的快速自旋。
總結(jié)
以上是生活随笔為你收集整理的多线程:AQS源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java面向对象基础整理
- 下一篇: 多线程:AQS的一些心得