日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

aqs java 简书,Java AQS源码解读

發(fā)布時間:2023/12/4 java 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 aqs java 简书,Java AQS源码解读 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1、先聊點別的

說實話,關(guān)于AQS的設計理念、實現(xiàn)、使用,我有打算寫過一篇技術(shù)文章,但是在寫完初稿后,發(fā)現(xiàn)掌握的還是模模糊糊的,模棱兩可。

痛定思痛,腳踏實地重新再來一遍。這次以 Java 8源碼為基礎進行解讀。

2、AQS簡介

在java.util.concurrent.locks包下,有兩個這樣的類:

AbstractQueuedSynchronizer

AbstractQueuedLongSynchronizer

這兩個類的唯一區(qū)別就是:

AbstractQueuedSynchronizer內(nèi)部維護的state變量是int類型

AbstractQueuedLongSynchronizer內(nèi)部維護的state變量是long類型

我們常說的AQS其實泛指的就是這兩個類,即抽象隊列同步器。

抽象隊列同步器AbstractQueuedSynchronizer (以下都簡稱AQS),是用來構(gòu)建鎖或者其他同步組件的骨架類,減少了各功能組件實現(xiàn)的代碼量,也解決了在實現(xiàn)同步器時涉及的大量細節(jié)問題,例如等待線程采用FIFO隊列操作的順序。在不同的同步器中還可以定義一些靈活的標準來判斷某個線程是應該通過還是等待。

AQS采用模板方法模式,在內(nèi)部維護了n多的模板的方法的基礎上,子類只需要實現(xiàn)特定的幾個方法(不是抽象方法!不是抽象方法!不是抽象方法!),就可以實現(xiàn)子類自己的需求。

基于AQS實現(xiàn)的組件,諸如:

ReentrantLock 可重入鎖(支持公平和非公平的方式獲取鎖)

Semaphore 計數(shù)信號量

ReentrantReadWriteLock 讀寫鎖

...

AQS是Doug Lea的大作之一,在維基百科查關(guān)于他的資料時,偶然發(fā)現(xiàn)老爺子喜歡紅色或淡粉色襯衫?

3、AQS設計思路

AQS內(nèi)部維護了一個int成員變量來表示同步狀態(tài),通過內(nèi)置的FIFO(first-in-first-out)同步隊列來控制獲取共享資源的線程。

我們可以猜測出,AQS其實主要做了這么幾件事情:

同步狀態(tài)(state)的維護管理

等待隊列的維護管理

線程的阻塞與喚醒

ps: 當然了,其內(nèi)部還維護了一個ConditionObject 內(nèi)部類,主要用作線程的協(xié)作與通信,我們暫時先不講這個帥哥。

通過AQS內(nèi)部維護的int型的state,可以用于表示任意狀態(tài)!

ReentrantLock用它來表示鎖的持有者線程已經(jīng)重復獲取該鎖的次數(shù),而對于非鎖的持有者線程來說,如果state大于0,意味著無法獲取該鎖,將該線程包裝為Node,加入到同步等待隊列里。

Semaphore用它來表示剩余的許可數(shù)量,當許可數(shù)量為0時,對未獲取到許可但正在努力嘗試獲取許可的線程來說,會進入同步等待隊列,阻塞,直到一些線程釋放掉持有的許可(state+1),然后爭用釋放掉的許可。

FutureTask用它來表示任務的狀態(tài)(未開始、運行中、完成、取消)。

ReentrantReadWriteLock在使用時,稍微有些不同,int型state用二進制表示是32位,前16位(高位)表示為讀鎖,后面的16位(低位)表示為寫鎖。

CountDownLatch使用state表示計數(shù)次數(shù),state大于0,表示需要加入到同步等待隊列并阻塞,直到state等于0,才會逐一喚醒等待隊列里的線程。

3.1 偽代碼之獲取鎖:

boolean acquire() throws InterruptedException {

while(當前狀態(tài)不允許獲取操作) {

if(需要阻塞獲取請求) {

如果當前線程不在隊列中,則將其插入隊列

阻塞當前線程

}

else

返回失敗

}

可能更新同步器的狀態(tài)

如果線程位于隊列中,則將其移出隊列

返回成功

}

3.2 偽代碼之釋放鎖:

void release() {

更新同步器的狀態(tài)

if (新的狀態(tài)允許某個被阻塞的線程獲取成功)

解除隊列中一個或多個線程的阻塞狀態(tài)

}

大概就是闡述這么個思路。

3.3 提供的方法

3.3.1 共通方法

以下三個方法,均為protected final修飾,每個繼承AQS的類都可以調(diào)用這三個方法。

protected final int getState() 獲取同步狀態(tài)

protected final void setState(int newState) 設置同步狀態(tài)

protected final boolean compareAndSetState(int expect, int update) 如果當前狀態(tài)值等于預期值,原子性地將同步狀態(tài)設置為給定的更新值,并返回true;否則返回false

3.3.2 子類需要實現(xiàn)的方法

以下五個方法,在AQS內(nèi)部并未實現(xiàn),而是交由子類去實現(xiàn),然后AQS再調(diào)用子類的實現(xiàn)方法,完成邏輯處理。

protected boolean tryAcquire(int) 嘗試以獨占模式獲取操作,應查詢對象的狀態(tài)是否允許以獨占模式獲取它,如果允許則獲取它。

protected boolean tryRelease(int) 嘗試釋放同步狀態(tài)

protected int tryAcquireShared(int) 共享的方式嘗試獲取操作

protected boolean tryReleaseShared(int) 共享的方式嘗試釋放

protected boolean isHeldExclusively() 調(diào)用此方法的線程,是否是獨占鎖的持有者

子類無須實現(xiàn)上述的所有方法,可以選擇其中一部分進行覆寫,但是要保持實現(xiàn)邏輯完整,不能穿插實現(xiàn)。根據(jù)實現(xiàn)方式不同,分為獨占鎖策略實現(xiàn)和共享鎖策略實現(xiàn)。

這也是為什么上述方法沒有定義為抽象方法的原因。如果定義為抽象方法,子類必須實現(xiàn)所有的五個方法,哪怕你壓根就用不到。

獨占鎖:

ReentrantLock

ReentrantReadWriteLock.WriteLock

實現(xiàn)策略:

tryAcquire(int)

tryRelease(int)

isHeldExclusively()

共享鎖:

CountDownLatch

ReentrantReadWriteLock.ReadLock

Semaphore

實現(xiàn)策略:

tryAcquireShared(int)

tryReleaseShared(int)

AQS還有很多內(nèi)部模板方法,就不一一舉例了,之后的源碼解讀,會展示一部分,并會配上騷氣的注釋。

4、AQS內(nèi)部屬性

4.1 CLH隊列

AQS通過內(nèi)置的FIFO(first-in-first-out)同步隊列來控制獲取共享資源的線程。CLH隊列是FIFO的雙端雙向隊列,AQS的同步機制就是依靠這個CLH隊列完成的。隊列的每個節(jié)點,都有前驅(qū)節(jié)點指針和后繼節(jié)點指針。

頭結(jié)點并不在阻塞隊列內(nèi)!

AQS-Node.jpg

Node源碼:

static final class Node {

// 共享模式下等待標記

static final Node SHARED = new Node();

// 獨占模式下等待標記

static final Node EXCLUSIVE = null;

// 表示當前的線程被取消

static final int CANCELLED = 1;

// 表示當前節(jié)點的后繼節(jié)點包含的線程需要運行,也就是unpark

static final int SIGNAL = -1;

// 表示當前節(jié)點在等待condition,也就是在condition隊列中

static final int CONDITION = -2;

// 表示當前場景下后續(xù)的acquireShared能夠得以執(zhí)行

static final int PROPAGATE = -3;

/**

* CANCELLED = 1 // 當前線程因為超時或者中斷被取消。這是一個終結(jié)態(tài),也就是狀態(tài)到此為止。

* SIGNAL = -1 // 表示當前線程的后繼線程被阻塞或即將被阻塞,當前線程釋放鎖或者取消后需要喚醒后繼線程。這個狀態(tài)一般都是后繼節(jié)點設置前驅(qū)節(jié)點的

* CONDITION = -2 // 表示當前線程在Condition隊列中

* PROPAGATE = -3 // 用于將喚醒后繼線程傳遞下去,這個狀態(tài)的引入是為了完善和增強共享鎖的喚醒機制

* 0 // 表示無狀態(tài)或者終結(jié)狀態(tài)!

*/

volatile int waitStatus;

// 前驅(qū)節(jié)點

volatile Node prev;

// 后繼節(jié)點

volatile Node next;

// 當前節(jié)點的線程,初始化使用,在使用后失效

volatile Thread thread;

// 存儲condition隊列中的后繼節(jié)點

Node nextWaiter;

// 如果該節(jié)點處于共享模式下等待,返回true

final boolean isShared() {

return nextWaiter == SHARED;

}

// 返回當前節(jié)點的前驅(qū)節(jié)點,如果為空,直接拋出空指針異常

final Node predecessor() throws NullPointerException {

Node p = prev;

if (p == null)

throw new NullPointerException();

else

return p;

}

Node() { // Used to establish initial head or SHARED marker

}

// 指定線程和模式的構(gòu)造方法

Node(Thread thread, Node mode) { // Used by addWaiter

// SHARED和EXCLUSIVE 用于表示當前節(jié)點是共享還是獨占

this.nextWaiter = mode;

this.thread = thread;

}

// 指定線程和節(jié)點狀態(tài)的構(gòu)造方法

Node(Thread thread, int waitStatus) { // Used by Condition

this.waitStatus = waitStatus;

this.thread = thread;

}

}

4.2 volatile state

最為重要的屬性,這個整數(shù)可以用于表示任意狀態(tài)!在上面有說過。

4.2 volatile head & volatile tail

head 頭結(jié)點,但是這個頭節(jié)點只是個虛節(jié)點,只是邏輯上代表持有鎖的線程節(jié)點,且head節(jié)點是不存儲thread線程信息和前驅(qū)節(jié)點信息的。

tail 尾節(jié)點,每個新節(jié)點都會進入隊尾。不存儲后繼節(jié)點信息。

這兩個屬性是延遲初始化的,在第一次且第一個線程持有鎖時,第二個線程因為獲取失敗,進入同步隊列時會對head和tail進行初始化,也就是說在所有線程都能獲取到鎖時,其內(nèi)部的head和tail都為null,一旦head 和 tail被初始化后,即使后來沒有線程持有鎖,其內(nèi)部的head 和 tail 依然保留最后一個持有鎖的線程節(jié)點!(head 和 tail都指向一個內(nèi)存地址)

當一個線程獲取鎖失敗而被加入到同步隊列時,會用CAS來設置尾節(jié)點tail為當前線程對應的Node節(jié)點。

AQS內(nèi)部的cas操作,都是依賴Unsafe類的,自Java9之后的版本,Unsafe類被移除,取而代之的是VarHandle類。

這兩個屬性均為volatile所修飾(保證了變量具有有序性和可見性)

4.3 spinForTimeoutThreshold

自旋超時閥值,在doAcquireSharedNanos()等方法中有使用到。

如果用戶定義的等待時間超過這個閥值,那么線程將阻塞,在阻塞期間如果能夠等到喚醒的機會并tryAcquireShared成功,則返回true,否則返回false,超時也返回false。

如果用戶定義的等待時間小于等于這個閥值,則會無限循環(huán),線程不阻塞,直到有線程釋放同步狀態(tài)或者超時,然后返回對應的結(jié)果。

4.4 exclusiveOwnerThread

這是AQS通過繼承AbstractOwnableSynchronizer類,獲得的屬性,表示獨占模式下的同步器持有者。

5、AQS具體實現(xiàn)

5.1 獨占鎖實現(xiàn)思路

5.1.1 獲取鎖 ReentrantLock.lock()

/**

* 獲取獨占鎖,忽略中斷。

* 首先嘗試獲取鎖,如果成功,則返回true;否則會把當前線程包裝成Node插入到隊尾,在隊列中會檢測是否為head的直接后繼,并嘗試獲取鎖,

* 如果獲取失敗,則會通過LockSupport阻塞當前線程,直至被釋放鎖的線程喚醒或者被中斷,隨后再次嘗試獲取鎖,如此反復。被喚醒后繼續(xù)之前的代碼執(zhí)行

*/

public final void acquire(int arg) {

if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

---------------------------------------------------------------------------------------

其中tryAcquire()方法需要由子類實現(xiàn),ReentrantLock通過覆寫這個方法實現(xiàn)了公平鎖和非公平鎖

---------------------------------------------------------------------------------------

/**

* 在同步等待隊列中插入節(jié)點

*/

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

Node pred = tail;

// 判斷尾節(jié)點是否為null

if (pred != null) {

node.prev = pred;

// 通過CAS在隊尾插入當前節(jié)點

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

// tail節(jié)點為null,則將新節(jié)點插入隊尾,必要時進行初始化

enq(node);

return node;

}

/**

* 通過無限循環(huán)和CAS操作在隊列中插入一個節(jié)點成功后返回。

* 將節(jié)點插入隊列,必要時進行初始化

*/

private Node enq(final Node node) {

for (;;) {

Node t = tail;

// 初始化head和tail

if (t == null) {

if (compareAndSetHead(new Node()))

tail = head;

} else {

node.prev = t;

/*

CAS設置tail為node

表面上看是把老tail的next連接到node。

如果同步隊列head節(jié)點和tail節(jié)點剛剛被這個線程初始化,實際上也把head的next也連接到了node,而老tail節(jié)點被node取締。

反之則是,把老tail的next連接到node,head并沒有與node產(chǎn)生連接,這樣就形成了鏈表 head old_tail tail

*/

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

/**

* 在隊列中的節(jié)點通過此方法獲取鎖,忽略中斷。

* 這個方法很重要,如果上述沒有獲取到鎖,將線程包裝成Node節(jié)點加入到同步隊列的尾節(jié)點,然后看代碼里的注釋

*/

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

/*

* 檢測當前節(jié)點前驅(qū)是否head,這是試獲取鎖。

* 如果是的話,則調(diào)用tryAcquire嘗試獲取鎖,

* 成功,則將head置為當前節(jié)點。原h(huán)ead節(jié)點的next被置為null等待GC垃圾回收

*/

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

/*

* 如果未成功獲取鎖則根據(jù)前驅(qū)節(jié)點判斷是否要阻塞。

* 如果阻塞過程中被中斷,則置interrupted標志位為true。

* shouldParkAfterFailedAcquire方法在前驅(qū)狀態(tài)不為SIGNAL的情況下都會循環(huán)重試獲取鎖。

* 如果shouldParkAfterFailedAcquire返回true,則會將當前線程阻塞并檢查是否被中斷

*/

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

/**

* 根據(jù)前驅(qū)節(jié)點中的waitStatus來判斷是否需要阻塞當前線程。

*/

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

if (ws == Node.SIGNAL)

/*

* 前驅(qū)節(jié)點設置為SIGNAL狀態(tài),在釋放鎖的時候會喚醒后繼節(jié)點,

* 所以后繼節(jié)點(也就是當前節(jié)點)現(xiàn)在可以阻塞自己。

*/

return true;

if (ws > 0) {

/*

* 前驅(qū)節(jié)點狀態(tài)為取消,向前遍歷,更新當前節(jié)點的前驅(qū)為往前第一個非取消節(jié)點。

* 當前線程會之后會再次回到循環(huán)并嘗試獲取鎖。

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

/**

* 等待狀態(tài)為0或者PROPAGATE(-3),設置前驅(qū)的等待狀態(tài)為SIGNAL,

* 并且之后會回到循環(huán)再次重試獲取鎖。

*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

/**

* 該方法實現(xiàn)某個node取消獲取鎖。

*/

private void cancelAcquire(Node node) {

if (node == null)

return;

node.thread = null;

// 遍歷并更新節(jié)點前驅(qū),把node的prev指向前部第一個非取消節(jié)點。

Node pred = node.prev;

while (pred.waitStatus > 0)

node.prev = pred = pred.prev;

// 記錄pred節(jié)點的后繼為predNext,后續(xù)CAS會用到。

Node predNext = pred.next;

// 直接把當前節(jié)點的等待狀態(tài)置為取消,后繼節(jié)點調(diào)用cancelAcquire方法時,也可以跨過該節(jié)點

node.waitStatus = Node.CANCELLED;

// 如果當前節(jié)點是尾節(jié)點,則將尾節(jié)點置為當前節(jié)點的前驅(qū)節(jié)點

if (node == tail && compareAndSetTail(node, pred)) {

compareAndSetNext(pred, predNext, null);

} else {

// 如果node還有后繼節(jié)點,這種情況要做的是把pred和后繼非取消節(jié)點拼起來。

int ws;

if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {

Node next = node.next;

/*

* 如果node的后繼節(jié)點next非取消狀態(tài)的話,則用CAS嘗試把pred的后繼置為node的后繼節(jié)點

* 這里if條件為false或者CAS失敗都沒關(guān)系,這說明可能有多個線程在取消,總歸會有一個能成功的。

*/

if (next != null && next.waitStatus <= 0)

compareAndSetNext(pred, predNext, next);

} else {

unparkSuccessor(node);

}

/*

* 在GC層面,和設置為null具有相同的效果

*/

node.next = node;

}

}

獲取獨占鎖的執(zhí)行過程大致如下:

假設當前鎖已經(jīng)被線程A持有,且持有鎖的時間足夠長(方便我們講解,也防止抬杠),線程B、C獲取鎖失敗。

線程B:

1、將線程B包裝成Node節(jié)點(簡稱BN),加入到同步等待隊列,此時BN的waitStatus=0

2、將tail節(jié)點設置為BN,且與head節(jié)點相連,形成鏈表

3、head節(jié)點是個虛擬節(jié)點,也就是持有鎖的線程(但并不包含有線程信息),tail節(jié)點就是BN

4、線程B進入"無限循環(huán)",判斷前驅(qū)節(jié)點是否為頭節(jié)點(true)并再次嘗試獲取鎖(false,獲取鎖失敗)

5、線程B將進入shouldParkAfterFailedAcquire方法,在方法內(nèi)部,將BN的前驅(qū)節(jié)點(也就是頭結(jié)點)的waitStatus設置為 -1,此方法返回false

6、因為是無限循環(huán),所以線程B再次進入shouldParkAfterFailedAcquire方法,由于BN的前驅(qū)節(jié)點(也就是頭結(jié)點)的waitStatus為 -1,所以直接返回true

7、調(diào)用parkAndCheckInterrupt,當前線程B被阻塞,等待喚醒。

線程C:

1、將線程C包裝成Node節(jié)點(簡稱CN),加入到同步等待隊列,此時CN的waitStatus=0

2、將tail節(jié)點設置為CN,且與原tail節(jié)點(BN節(jié)點)相連

3、線程C進入"無限循環(huán)",判斷前驅(qū)節(jié)點是否為頭節(jié)點(false)

4、線程C將進入shouldParkAfterFailedAcquire方法,在方法內(nèi)部,將CN的前驅(qū)節(jié)點(也就是BN結(jié)點)的waitStatus設置為 -1,此方法返回false

5、因為是無限循環(huán),所以線程C再次進入shouldParkAfterFailedAcquire方法,由于CN的前驅(qū)節(jié)點(也就是BN結(jié)點)的waitStatus為 -1,所以直接返回true

6、調(diào)用parkAndCheckInterrupt,線程C被阻塞,等待喚醒。

最終的隊列如下:

+------+ +------+ +------+

| |

| head | | BN | | tail |

| AN | ---> | | ---> | (CN) |

+------+ +------+ +------+

5.1.2 釋放鎖 ReentrantLock.unlock()

對于釋放獨占鎖,會調(diào)用tryRelaes(int)方法,該方法由子類實現(xiàn),在完全釋放掉鎖后,釋放掉鎖的線程會將后繼線程喚醒,后繼線程進行鎖爭用(非公平鎖)

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

// 頭結(jié)點不為null且后繼節(jié)點是需要被喚醒的

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

釋放獨占鎖的執(zhí)行過程大致如下(假設有后繼節(jié)點需要喚醒):

將head節(jié)點的waitStatus設置為0

喚醒后繼節(jié)點

后繼節(jié)點線程被喚醒后,會將后繼節(jié)點設置為head,并對后繼節(jié)點內(nèi)的prev和thread屬性設置為null

對原h(huán)ead節(jié)點的next指針設置為null,等待GC回收原h(huán)ead節(jié)點。

+------+ +------+ +------+

| old |

| head | | head | | tail |

| AN | -X-> | BN | ---> | (CN) |

+------+ +------+ +------+

如上所示,AN節(jié)點(原h(huán)ead節(jié)點)等待被GC垃圾回收。

5.2 共享鎖實現(xiàn)思路

5.2.1 獲取鎖

與獲取獨占鎖不同,關(guān)鍵在于,共享鎖可以被多個線程持有。

如果需要AQS實現(xiàn)共享鎖,在實現(xiàn)tryAcquireShared()方法時:

返回負數(shù),表示獲取失敗

返回0,表示獲取成功,但是后繼爭用線程不會成功

返回正數(shù),表示獲取成功,表示后繼爭用線程也可能成功

public final void acquireShared(int arg) {

if (tryAcquireShared(arg) < 0)

doAcquireShared(arg);

}

private void doAcquireShared(int arg) {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

// 一旦共享獲取成功,設置新的頭結(jié)點,并且喚醒后繼線程

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

if (interrupted)

selfInterrupt();

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

/**

* 這個函數(shù)做的事情有兩件:

* 1. 在獲取共享鎖成功后,設置head節(jié)點

* 2. 根據(jù)調(diào)用tryAcquireShared返回的狀態(tài)以及節(jié)點本身的等待狀態(tài)來判斷是否要需要喚醒后繼線程

*/

private void setHeadAndPropagate(Node node, int propagate) {

// 把當前的head封閉在方法棧上,用以下面的條件檢查

Node h = head;

setHead(node);

/*

* propagate是tryAcquireShared的返回值,這是決定是否傳播喚醒的依據(jù)之一

* h.waitStatus為SIGNAL或者PROPAGATE時也根據(jù)node的下一個節(jié)點共享來決定是否傳播喚醒

*/

if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {

Node s = node.next;

if (s == null || s.isShared())

doReleaseShared();

}

}

/**

* 這是共享鎖中的核心喚醒函數(shù),主要做的事情就是喚醒下一個線程或者設置傳播狀態(tài)。

* 后繼線程被喚醒后,會嘗試獲取共享鎖,如果成功之后,則又會調(diào)用setHeadAndPropagate,將喚醒傳播下去。

* 這個函數(shù)的作用是保障在acquire和release存在競爭的情況下,保證隊列中處于等待狀態(tài)的節(jié)點能夠有辦法被喚醒。

*/

private void doReleaseShared() {

/*

* 以下的循環(huán)做的事情就是,在隊列存在后繼線程的情況下,喚醒后繼線程;

* 或者由于多線程同時釋放共享鎖由于處在中間過程,讀到head節(jié)點等待狀態(tài)為0的情況下,

* 雖然不能unparkSuccessor,但為了保證喚醒能夠正確穩(wěn)固傳遞下去,設置節(jié)點狀態(tài)為PROPAGATE。

* 這樣的話獲取鎖的線程在執(zhí)行setHeadAndPropagate時可以讀到PROPAGATE,從而由獲取鎖的線程去釋放后繼等待線程。

*/

for (;;) {

Node h = head;

// 如果隊列中存在后繼線程。

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue;

unparkSuccessor(h);

}

// 如果h節(jié)點的狀態(tài)為0,需要設置為PROPAGATE用以保證喚醒的傳播。

else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue;

}

// 檢查h是否仍然是head,如果不是的話需要再進行循環(huán)。

if (h == head)

break;

}

}

5.2.1 釋放鎖

釋放共享鎖與獲取共享鎖的代碼都使用了doReleaseShared(int)

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

// doReleaseShared的實現(xiàn)上面獲取共享鎖已經(jīng)介紹

doReleaseShared();

return true;

}

return false;

}

我覺得大家應該都能看懂,還是簡單說一下吧(手動狗頭~):

同步等待隊列中,在喚醒因為獲取共享鎖失敗而阻塞的后繼節(jié)點線程后,后繼節(jié)點線程會依次喚醒其后繼節(jié)點!依次類推。

再換種說法?

這種情況有可能是:寫鎖導致獲取讀鎖的一些線程阻塞,而寫鎖釋放后,會喚醒后繼節(jié)點線程,如果該后繼節(jié)點,恰好是因為獲取讀鎖失敗而阻塞的線程,那么該后繼節(jié)點線程會喚醒其后繼節(jié)點...直到全部獲取讀鎖成功,或者某一節(jié)點獲取寫鎖成功。

6、拓展

6.1 不得不說的PROPAGATE

在共享鎖獲取與釋放的操作中,我覺得有個特別的重要的waitStatus狀態(tài)值,要和大家說一說,就是PROPAGATE,這個屬性值的意思是,用于將喚醒后繼線程傳遞下去,這個狀態(tài)的引入是為了完善和增強共享鎖的喚醒機制。

之前翻閱了很多關(guān)于AQS的文章,講到這個狀態(tài)值的少之又少,哪怕是《Java并發(fā)編程實戰(zhàn)》這本書,也是沒有提及,最終我看到有一位博客園的作者非常詳實的闡述了這個PEOPAGATE狀態(tài),也是給了我很大的啟發(fā)。

沒錯,我第一次看AQS的源碼的時候,甚至直接把這個PROPAGATE狀態(tài)值忽略掉了。事實上,不僅僅閱讀源碼的人,容易把這個PROPAGATE狀態(tài)值忽略掉,哪怕是Doug Lea老爺子本人,在開發(fā)時也沒有意識到,如果沒有這個狀態(tài)值會導致什么樣的后果,直到上面鏈接的bug出現(xiàn)后,老爺子才加上了這個狀態(tài),徹底修復了這個bug。

復現(xiàn)該bug的代碼:

import java.util.concurrent.Semaphore;

public class TestSemaphore {

private static Semaphore sem = new Semaphore(0);

private static class Thread1 extends Thread {

@Override

public void run() {

sem.acquireUninterruptibly();

}

}

private static class Thread2 extends Thread {

@Override

public void run() {

sem.release();

}

}

public static void main(String[] args) throws InterruptedException {

for (int i = 0; i < 10000000; i++) {

Thread t1 = new Thread1();

Thread t2 = new Thread1();

Thread t3 = new Thread2();

Thread t4 = new Thread2();

t1.start();

t2.start();

t3.start();

t4.start();

t1.join();

t2.join();

t3.join();

t4.join();

System.out.println(i);

}

}

}

程序執(zhí)行時,會偶發(fā)線程hang住。

我們再來看看之前的setHeadAndPropagate方法是什么樣的。

private void setHeadAndPropagate(Node node, int propagate) {

setHead(node);

if (propagate > 0 && node.waitStatus != 0) {

Node s = node.next;

if (s == null || s.isShared())

unparkSuccessor(node);

}

}

然后Semaphore.release()調(diào)用的是AQS的releaseShared,看看當時的releaseShared長什么樣:

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

再看看當時的Node:

static final class Node {

// 忽略掉無關(guān)的代碼,只展示waitStatus的狀態(tài)值

static final int CANCELLED = 1;

static final int SIGNAL = -1;

static final int CONDITION = -2;

}

setHeadAndPropagate方法和releaseShared方法,設計的也是很簡單。

當時源碼里,Node的waitStatus是沒有PROPAGATE=-3這個狀態(tài)值的。

為了方便大家對照,我把當時unparkSuccessor方法的源碼,也一并展示出來:

private void unparkSuccessor(Node node) {

// 將node的waitStatus設置為0

compareAndSetWaitStatus(node, Node.SIGNAL, 0);

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

接下來,我們慢慢聊~

ps: 說真的,現(xiàn)在老板的位置離我的位置不遠,雖然我的工作已經(jīng)提前很多天完成了,但,還是有點慌~,冒著風險還要繼續(xù)寫!

在AQS獲取共享鎖的操作中,進入同步等待的線程(被阻塞掉),有兩種途徑可以被喚醒:

其他線程釋放信號量后,調(diào)用unparkSuccessor(releaseShared方法中)

其他線程獲取共享鎖成功后,會通過傳播機制來喚醒后繼節(jié)點(也就是在setHeadAndPropagate方法中)。

bug重現(xiàn)的例子,很簡單,就是在循環(huán)中重復不斷的實例化4個線程,前兩個線程獲取信號量,兩個線程釋放信號量,主線程等待4個線程全都執(zhí)行完畢再執(zhí)行打印。

在后兩個線程沒有進行釋放信號量的操作時,AQS內(nèi)部的同步等待隊列是下面這種情況:

+------+ +------+ +------+

| |

| head | | t1 | | t2 |

| | ---> | | ---> | |

+------+ +------+ +------+

1、t3釋放信號量,調(diào)用releaseShared,喚醒后繼節(jié)點里的線程t1,同時,head的waitStatus變?yōu)?

2、t1被喚醒,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法,返回0

3、t4釋放信號量,調(diào)用releaseShared,在releaseShared方法中讀到的head還是原h(huán)ead,但是此時head的waitStatus已經(jīng)變?yōu)?,所以不會調(diào)用unparkSuccessor方法

4、t1被喚醒了,由于在步驟2里,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法,返回的是0,所以它也不會調(diào)用unparkSuccessor方法

至此,兩種途徑全部被封死,沒有任何線程去喚醒t2了,線程被hang住...

ps:Doug Lea 黑人問號臉,哈哈~

老爺子為了修復這個bug,做出了如下改進:

1、增加一個waitStatus的狀態(tài),即PROPAGATE

2、在releaseShared方法中抽取提煉出了doReleaseShared()(上面有展示)在doReleaseShared方法中,如果head節(jié)點的狀態(tài)為0,需要設置為PROPAGATE用以保證喚醒的傳播。

3、在setHeadAndPropagate方法中也多了一些判斷,其中就有head節(jié)點的waitStatus如果小于0,就喚醒后繼節(jié)點(PROPAGATE = -3)。

通過改進之后的代碼,我們再來復盤一下:

1、t3釋放信號量,調(diào)用releaseShared,喚醒后繼節(jié)點里的線程t1,同時,head的waitStatus變?yōu)?

2、t1被喚醒,調(diào)用Semaphore.NonfairSync的tryAcquireShared方法,返回0

3、此步驟和2和同一時刻發(fā)生,t4釋放信號量,調(diào)用releaseShared,在doReleaseShared方法中讀到的head還是原h(huán)ead,但是此時head的waitStatus已經(jīng)變?yōu)?,將head的waitStatus設置為PROPAGATE(-3)

4、t1被喚醒了,調(diào)用setHeadAndPropagate方法,將t1設置為head,符合條件判斷,進入分支語句,調(diào)用doReleaseShared方法,繼而喚醒t2節(jié)點線程。

6.2 unparkSuccessor的一點思考

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* 通常情況下,要喚醒的線程都是當前節(jié)點的后繼線程

* 但是,如果當前節(jié)點的后繼節(jié)點被取消了,則從隊列尾部向前遍歷,直到找到未被取消的后繼節(jié)點

*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

unparkSuccessor方法中,如果當前節(jié)點的后繼節(jié)點被取消了,則從隊列尾部向前遍歷,直到找到未被取消的后繼節(jié)點。

這個問題,大家也可以自己思考一下,為什么要從tail節(jié)點開始向前遍歷?

假設,CLH隊列如下圖所示:

+------+ +------+ +------+

| |

| head | | t1 | | tail |

| | ---> | | ---> | |

+------+ +------+ +------+

t1.waitStatus = 1 且 tail.waitStatus = 1

head嘗試喚醒后繼節(jié)點t1,發(fā)現(xiàn)t1是被取消狀態(tài),遂找出t1的后繼節(jié)點tail,發(fā)現(xiàn)tail也是被取消狀態(tài),但是tail.next == null。

與此同時,有個新節(jié)點加入到隊列尾部,但是還沒有將原tail.next指向新節(jié)點。

也就是說,tail.next 如果恰好處在步驟1和步驟2中間的話,遍歷就會中斷。

摘錄addWaiter部分代碼:

node.prev = pred;

// 通過CAS在隊尾插入當前節(jié)點

if (compareAndSetTail(pred, node)) { // 步驟1

pred.next = node; // 步驟2

return node;

}

6.3 acquireQueued 方法里,為什么還要再tryAcquire?

以獨占模式來說,對于這個問題,我是這么想的:

時刻1:線程B嘗試獲取鎖,但是,由于鎖被線程A持有,所以,線程B準備調(diào)用addWaiter,將自己入到隊列(但還沒有和head節(jié)點產(chǎn)生指針連接)

時刻1:同一時刻,線程A嘗試釋放鎖,進入release方法,調(diào)用子類的tryRelease(),將代表鎖持有次數(shù)的state置為0(代表鎖沒有被任何線程持有),進入unparkSuccessor方法,發(fā)現(xiàn)并沒有后繼節(jié)點(因為新節(jié)點還未入隊),所以不會喚醒任何線程,到這里,線程A釋放鎖操作完成。

時刻2:線程B調(diào)用addWaiter方法完畢,已經(jīng)入隊,并和head節(jié)點產(chǎn)生指針連接

時刻3:線程B調(diào)用acquireQueued方法(如下方代碼展示),如果在這個方法里面不調(diào)用tryAcquire,就會發(fā)生這樣的情況:明明可以獲取鎖,但是線程卻被休眠了,進而導致整個同步隊列不可用

所以,再次調(diào)用tryAcquire是為了防止新節(jié)點還未入隊,但是頭結(jié)點已經(jīng)釋放了鎖,導致整個同步隊列癱瘓的情況發(fā)生。

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

// 防止新節(jié)點還未入隊,但是頭結(jié)點已經(jīng)釋放了鎖,導致整個同步隊列中斷癱瘓的情況發(fā)生

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

結(jié)束

通過閱讀AQS的源碼,對于我們學習和掌握基于AQS實現(xiàn)的組件,是有很大幫助的。

尤其是它的設計理念和思想,更是我們學習的重點!

總結(jié)

以上是生活随笔為你收集整理的aqs java 简书,Java AQS源码解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。