日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

aqs java 简书,Java并发之AQS原理

發(fā)布時間:2023/12/15 java 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 aqs java 简书,Java并发之AQS原理 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一.總體框架

AQS是指AbstractQueuedSynchronizer。它是一個抽象類,java并發(fā)包里的ReentrantLock、CountDownLatch和Semaphroe等重要的工具類都是基于AQS來實現(xiàn)的。

總體來說,AQS維護了一個volatile的state變量代表共享資源,還有一個FIFO的等待隊列,在多線程爭奪資源被阻塞時會進入此隊列了。等待隊列是個雙向鏈表記錄則沒有獲取的執(zhí)行許可的線程。等待隊列中的結(jié)點元素是AQS自定義的static的內(nèi)部類Node。AQS支持共享和獨占兩種模式。ReentrantLock就是獨占型的,只有一個線程可以獲得到鎖并執(zhí)行。CountDownLatch和Semaphore就是共享型,允許多個線程同時執(zhí)行。

AQS是一個抽象類,并不能被直接實例化使用。它的作用是提供等待隊列的管理,包括如何入隊何時喚醒等。而具體的資源如何獲取和釋放等由具體的自定義同步器來實現(xiàn)。也就是說ReentrantLock等類自定義了資源(state)的獲取和釋放,而使用AQS的來管理阻塞隊列。不同的自定義資源獲取方式實現(xiàn)了CountDownLatch和Semaphore等類。

自定義同步方法需要實現(xiàn)的方法有:

isHeldExclusively() //返回該線程是否正在獨占資源,只有用的condition才需要去實現(xiàn)它

tryAcquire(int); //獨占方式,嘗試獲取資源,成功返回true,失敗返回false

tryRelease(int); //獨占方式,嘗試釋放資源,成功返回true,失敗返回false

tryAcquireShared(int); //共享方法,嘗試獲取資源。返回負數(shù)表示失敗,0表示成功,但沒有可用資源了,正數(shù)表示成功且有剩余資源

tryReleaseShared(int);//共享方式。嘗試釋放資源。如果釋放后運行喚醒后續(xù)結(jié)點返回true,否則返回false

這其中tryAcquire和tryRelease是一組,用于實現(xiàn)獨占資源的情況,如ReentrantLock;tryAcquireShared和tryReleaseShared是一組用于實現(xiàn)共享資源的情況,如CountDownLatch。

二.源碼分析

2.1 acquire方法源碼詳解

在AQS中一個重要的方法是acquire(int),這個方法實現(xiàn)請求資源和阻塞線程的功能。下面先貼一下它的源碼:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

這個方法里只有一個if語句。首先執(zhí)行tryAcquire(int)方法,前面說了這個方法需要子類來自定義,這里先看一下它的代碼:

protected boolean tryAcquire(int arg) {

throw new UnsupportedOperationException();

}

可以看到在AQS中tryAcquire方法直接拋出了異常。因為具體的獲取資源細節(jié)需要子類根據(jù)自己要實現(xiàn)的功能來寫,AQS只負責(zé)阻塞隊列的管理等工作。同時注意到這個方法并不是一個抽象的方法。其實前面說的需要子類實現(xiàn)的5個方法都不是抽象的,因為子類并不一定需要實現(xiàn)所有這些方法,這提供了一定的靈活性。

2.1.1 addWaiter方法詳解

先忙接著看acquire方法。在if語句里,如果tryAcquire返回true,那么acquire就返回了,說明成功獲取到了資源。如果tryAcquire返回false,if語句的前半句判斷就成立了,需要繼續(xù)執(zhí)行&&右邊的acquireQueued方法,執(zhí)行它之前先執(zhí)行了addWaiter。先看一下addWaiter它的代碼:

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

可以看到這個方法有一個Node參數(shù)。Node類便是aqs維護的FIFO隊列中的元素的類型。回顧一下acquire()方法的代碼,是將Node.EXCLUSIVE作為參數(shù)傳入了addWaiter。查看Node源碼發(fā)現(xiàn)有這么一句:

/** Marker to indicate a node is waiting in exclusive mode */

static final Node EXCLUSIVE = null;

原來這是一個null值,用來表示獨占性線程。不管如何,先繼續(xù)看addWaiter的源碼吧。

第一句代碼:Node node = new Node(Thread.currentThread,mode);新建了一個表示當(dāng)前線程的結(jié)點。剛才傳入的null作為模式傳給構(gòu)造方法。進入對象構(gòu)造方法查看:

Node(Thread thread, Node mode) { // Used by addWaiter

this.nextWaiter = mode;

this.thread = thread;

}

繼續(xù)看addWaiter的后續(xù)代碼,發(fā)現(xiàn)是獲取了當(dāng)前隊列的尾節(jié)點,并將新建結(jié)點的prev指針執(zhí)行尾節(jié)點,再使用cas嘗試替換尾節(jié)點,如果成功,那么當(dāng)前結(jié)點就成為新的尾節(jié)點,返回。

如果cas失敗或者當(dāng)前tail為null,調(diào)用eng方法處理。下面看一下eng的代碼:

private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) { // Must initialize

if (compareAndSetHead(new Node()))

tail = head;

} else {

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

熟悉AtomicInteger的朋友看到這段代碼一定會感動非常熟悉。這里就是使用了循環(huán)嘗試的方式來進行cas操作,指導(dǎo)成功為止。另外當(dāng)tail==null時,先新建head結(jié)點再進行操作,當(dāng)前這里給head變量反之也是使用了cas操作。

綜上,addWaiter()進行的操作就是安全地更新隊列的tail指針。

2.1.2 acquireQueue方法詳解

下面繼續(xù)看acquire()方法。再把代碼貼一次。

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

給acqureQueued方法傳入的第一個參數(shù)是addWaiter方法的返回值,回想一下剛才的addWaiter方法,發(fā)現(xiàn)它的返回值是新創(chuàng)建的表示當(dāng)前線程的Node結(jié)點。acquireQueued方法的另一個參數(shù)是acquire的形參arg,這個一般是獲取資源的個數(shù),像ReentrantLock的lock方法就是調(diào)用了acquire(1)。下面看一下acquireQueued方法的源碼吧:

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

這個方法的主體是一個死循環(huán),不斷測試兩件事:1.是否是頭結(jié)點的下一個節(jié)點,說明該輪到自己獲取資源了。2:是否可以休息了。判斷1成功后就用tryAcquire獲取資源,成功后設(shè)置當(dāng)前結(jié)點為頭結(jié)點,返回。如果1判斷不成功則執(zhí)行shouldParkAfterFailedAcquire方法,先貼一下代碼:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

if (ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park.

*/

return true;

if (ws > 0) {

/*

* Predecessor was cancelled. Skip over predecessors and

* indicate retry.

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

/*

* waitStatus must be 0 or PROPAGATE. Indicate that we

* need a signal, but don't park yet. Caller will need to

* retry to make sure it cannot acquire before parking.

*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

這個方法的工作是找到當(dāng)前結(jié)點之前的一個未取消的結(jié)點,將其waitStatue改為SIGNAL(-1)。這樣在該結(jié)點釋放資源時就會喚醒當(dāng)前結(jié)點。

當(dāng)shouldParkAfterFailedAcquire返回true之后,當(dāng)前線程就可以去休息了——調(diào)用parkAndCheckInterrupt方法:

private final boolean parkAndCheckInterrupt() {

LockSupport.park(this);

return Thread.interrupted();

}

這個方法使用了LockSupport的park方法,使線程進入waiting狀態(tài)。當(dāng)其它線程調(diào)用unPark方法,或此線程被中斷后才會返回。

2.1.3小結(jié)

下面來總結(jié)一下acquire方法。

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

先嘗試獲取資源,獲取到的情況直接返回。獲取不到將線程加入隊列:首先將tail指向表示當(dāng)前線程的結(jié)點,使用CAS操作更新tail。之后執(zhí)行acquireQueued方法,如果是當(dāng)前隊列的第二個則再次嘗試獲取tryAcquire,成功后將自己設(shè)置為head(head表示已經(jīng)獲取到的資源的結(jié)點)。不能獲取資源時判斷是否可以park(),判斷依據(jù)是其prev的結(jié)點的waitState是否是signal,即是否會在釋放資源時通知它。之后當(dāng)前線程調(diào)用park進入waiting狀態(tài)。waitting結(jié)束時返回是否中斷標(biāo)志,并重置標(biāo)志。回到acquire,如果waitting期間中斷過,則調(diào)用selfInterrupt響應(yīng)中斷。

2.2 release(int)

此方法是獨占模式下釋放資源的頂層方法。

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

這里可以看出釋放資源成功時,獲取到head結(jié)點(因為head結(jié)點表示的線程就是當(dāng)前獲取到資源的線程),執(zhí)行unparkSuccessor()操作。這里便和shouldParkAfterFailedAcquire中‘休息’的代碼相呼應(yīng)。如果那里設(shè)置了waitStatus為signal就會使用LockSupport.unpark方法來喚醒等待的線程。

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* 如果后繼結(jié)點為null或等待狀態(tài)>0(當(dāng)前結(jié)點被取消),則從后往前找到正在應(yīng)該被喚醒的結(jié)點

*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

對了,tryRelease方法也是具體的同步器來實現(xiàn)的。

2.3 其它方法

acquireShared(int)和releaseShared()方法是共享模式下獲取資源和釋放資源的方法。這里不再詳細展開了,請看參考資料里的文章。

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的aqs java 简书,Java并发之AQS原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。