aqs java 简书,Java并发之AQS原理
一.總體框架
AQS是指AbstractQueuedSynchronizer。它是一個抽象類,java并發(fā)包里的ReentrantLock、CountDownLatch和Semaphroe等重要的工具類都是基于AQS來實現(xiàn)的。
總體來說,AQS維護了一個volatile的state變量代表共享資源,還有一個FIFO的等待隊列,在多線程爭奪資源被阻塞時會進入此隊列了。等待隊列是個雙向鏈表記錄則沒有獲取的執(zhí)行許可的線程。等待隊列中的結(jié)點元素是AQS自定義的static的內(nèi)部類Node。AQS支持共享和獨占兩種模式。ReentrantLock就是獨占型的,只有一個線程可以獲得到鎖并執(zhí)行。CountDownLatch和Semaphore就是共享型,允許多個線程同時執(zhí)行。
AQS是一個抽象類,并不能被直接實例化使用。它的作用是提供等待隊列的管理,包括如何入隊何時喚醒等。而具體的資源如何獲取和釋放等由具體的自定義同步器來實現(xiàn)。也就是說ReentrantLock等類自定義了資源(state)的獲取和釋放,而使用AQS的來管理阻塞隊列。不同的自定義資源獲取方式實現(xiàn)了CountDownLatch和Semaphore等類。
自定義同步方法需要實現(xiàn)的方法有:
isHeldExclusively() //返回該線程是否正在獨占資源,只有用的condition才需要去實現(xiàn)它
tryAcquire(int); //獨占方式,嘗試獲取資源,成功返回true,失敗返回false
tryRelease(int); //獨占方式,嘗試釋放資源,成功返回true,失敗返回false
tryAcquireShared(int); //共享方法,嘗試獲取資源。返回負數(shù)表示失敗,0表示成功,但沒有可用資源了,正數(shù)表示成功且有剩余資源
tryReleaseShared(int);//共享方式。嘗試釋放資源。如果釋放后運行喚醒后續(xù)結(jié)點返回true,否則返回false
這其中tryAcquire和tryRelease是一組,用于實現(xiàn)獨占資源的情況,如ReentrantLock;tryAcquireShared和tryReleaseShared是一組用于實現(xiàn)共享資源的情況,如CountDownLatch。
二.源碼分析
2.1 acquire方法源碼詳解
在AQS中一個重要的方法是acquire(int),這個方法實現(xiàn)請求資源和阻塞線程的功能。下面先貼一下它的源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這個方法里只有一個if語句。首先執(zhí)行tryAcquire(int)方法,前面說了這個方法需要子類來自定義,這里先看一下它的代碼:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
可以看到在AQS中tryAcquire方法直接拋出了異常。因為具體的獲取資源細節(jié)需要子類根據(jù)自己要實現(xiàn)的功能來寫,AQS只負責(zé)阻塞隊列的管理等工作。同時注意到這個方法并不是一個抽象的方法。其實前面說的需要子類實現(xiàn)的5個方法都不是抽象的,因為子類并不一定需要實現(xiàn)所有這些方法,這提供了一定的靈活性。
2.1.1 addWaiter方法詳解
先忙接著看acquire方法。在if語句里,如果tryAcquire返回true,那么acquire就返回了,說明成功獲取到了資源。如果tryAcquire返回false,if語句的前半句判斷就成立了,需要繼續(xù)執(zhí)行&&右邊的acquireQueued方法,執(zhí)行它之前先執(zhí)行了addWaiter。先看一下addWaiter它的代碼:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
可以看到這個方法有一個Node參數(shù)。Node類便是aqs維護的FIFO隊列中的元素的類型。回顧一下acquire()方法的代碼,是將Node.EXCLUSIVE作為參數(shù)傳入了addWaiter。查看Node源碼發(fā)現(xiàn)有這么一句:
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
原來這是一個null值,用來表示獨占性線程。不管如何,先繼續(xù)看addWaiter的源碼吧。
第一句代碼:Node node = new Node(Thread.currentThread,mode);新建了一個表示當(dāng)前線程的結(jié)點。剛才傳入的null作為模式傳給構(gòu)造方法。進入對象構(gòu)造方法查看:
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
繼續(xù)看addWaiter的后續(xù)代碼,發(fā)現(xiàn)是獲取了當(dāng)前隊列的尾節(jié)點,并將新建結(jié)點的prev指針執(zhí)行尾節(jié)點,再使用cas嘗試替換尾節(jié)點,如果成功,那么當(dāng)前結(jié)點就成為新的尾節(jié)點,返回。
如果cas失敗或者當(dāng)前tail為null,調(diào)用eng方法處理。下面看一下eng的代碼:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
熟悉AtomicInteger的朋友看到這段代碼一定會感動非常熟悉。這里就是使用了循環(huán)嘗試的方式來進行cas操作,指導(dǎo)成功為止。另外當(dāng)tail==null時,先新建head結(jié)點再進行操作,當(dāng)前這里給head變量反之也是使用了cas操作。
綜上,addWaiter()進行的操作就是安全地更新隊列的tail指針。
2.1.2 acquireQueue方法詳解
下面繼續(xù)看acquire()方法。再把代碼貼一次。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
給acqureQueued方法傳入的第一個參數(shù)是addWaiter方法的返回值,回想一下剛才的addWaiter方法,發(fā)現(xiàn)它的返回值是新創(chuàng)建的表示當(dāng)前線程的Node結(jié)點。acquireQueued方法的另一個參數(shù)是acquire的形參arg,這個一般是獲取資源的個數(shù),像ReentrantLock的lock方法就是調(diào)用了acquire(1)。下面看一下acquireQueued方法的源碼吧:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這個方法的主體是一個死循環(huán),不斷測試兩件事:1.是否是頭結(jié)點的下一個節(jié)點,說明該輪到自己獲取資源了。2:是否可以休息了。判斷1成功后就用tryAcquire獲取資源,成功后設(shè)置當(dāng)前結(jié)點為頭結(jié)點,返回。如果1判斷不成功則執(zhí)行shouldParkAfterFailedAcquire方法,先貼一下代碼:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這個方法的工作是找到當(dāng)前結(jié)點之前的一個未取消的結(jié)點,將其waitStatue改為SIGNAL(-1)。這樣在該結(jié)點釋放資源時就會喚醒當(dāng)前結(jié)點。
當(dāng)shouldParkAfterFailedAcquire返回true之后,當(dāng)前線程就可以去休息了——調(diào)用parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這個方法使用了LockSupport的park方法,使線程進入waiting狀態(tài)。當(dāng)其它線程調(diào)用unPark方法,或此線程被中斷后才會返回。
2.1.3小結(jié)
下面來總結(jié)一下acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先嘗試獲取資源,獲取到的情況直接返回。獲取不到將線程加入隊列:首先將tail指向表示當(dāng)前線程的結(jié)點,使用CAS操作更新tail。之后執(zhí)行acquireQueued方法,如果是當(dāng)前隊列的第二個則再次嘗試獲取tryAcquire,成功后將自己設(shè)置為head(head表示已經(jīng)獲取到的資源的結(jié)點)。不能獲取資源時判斷是否可以park(),判斷依據(jù)是其prev的結(jié)點的waitState是否是signal,即是否會在釋放資源時通知它。之后當(dāng)前線程調(diào)用park進入waiting狀態(tài)。waitting結(jié)束時返回是否中斷標(biāo)志,并重置標(biāo)志。回到acquire,如果waitting期間中斷過,則調(diào)用selfInterrupt響應(yīng)中斷。
2.2 release(int)
此方法是獨占模式下釋放資源的頂層方法。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
這里可以看出釋放資源成功時,獲取到head結(jié)點(因為head結(jié)點表示的線程就是當(dāng)前獲取到資源的線程),執(zhí)行unparkSuccessor()操作。這里便和shouldParkAfterFailedAcquire中‘休息’的代碼相呼應(yīng)。如果那里設(shè)置了waitStatus為signal就會使用LockSupport.unpark方法來喚醒等待的線程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 如果后繼結(jié)點為null或等待狀態(tài)>0(當(dāng)前結(jié)點被取消),則從后往前找到正在應(yīng)該被喚醒的結(jié)點
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
對了,tryRelease方法也是具體的同步器來實現(xiàn)的。
2.3 其它方法
acquireShared(int)和releaseShared()方法是共享模式下獲取資源和釋放資源的方法。這里不再詳細展開了,請看參考資料里的文章。
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的aqs java 简书,Java并发之AQS原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 4 条生产线受损,苹果数据线供应商 Fo
- 下一篇: java工程师去字节飞书可以,字节跳动飞