Java并发编程—AQS原理分析
目錄
一、AQS原理簡述
二、自定義獨(dú)占鎖及共享鎖
三、鎖的可重入性
四、鎖的公平性
五、驚群效應(yīng)
AQS全稱AbstractQueuedSynchronizer,它是實(shí)現(xiàn)?JCU包中幾乎所有的鎖、多線程并發(fā)以及線程同步器等重要組件的基石, 其核心思想是基于volatile int state這樣的一個(gè)屬性同時(shí)配合Unsafe工具對(duì)其原子性的操作來實(shí)現(xiàn)對(duì)當(dāng)前鎖的狀態(tài)進(jìn)行修改 。
一、AQS原理簡述
AQS內(nèi)部維護(hù)著一個(gè)FIFO的CLH隊(duì)列(無鎖隊(duì)列:Concurrent Lock-free FIFO),該隊(duì)列的基本結(jié)構(gòu)如下:
1.1、Node節(jié)點(diǎn)
AQS中使用Node來表示CLH隊(duì)列的每個(gè)節(jié)點(diǎn),源碼如下:
static final class Node {//表示共享模式(共享鎖)static final Node SHARED = new Node();//表示獨(dú)占模式(獨(dú)占鎖)static final Node EXCLUSIVE = null;//表示線程已取消static final int CANCELLED = 1;//表示當(dāng)前結(jié)點(diǎn)的后繼節(jié)點(diǎn)需要被喚醒static final int SIGNAL = -1;//線程(處在Condition休眠狀態(tài))在等待Condition喚醒static final int CONDITION = -2;//表示鎖的下一次獲取可以無條件傳播,在共享模式頭結(jié)點(diǎn)有可能處于這種狀態(tài)static final int PROPAGATE = -3;//線程等待狀態(tài)volatile int waitStatus;//當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)volatile Node prev;//當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)volatile Node next;//當(dāng)前節(jié)點(diǎn)所代表的的線程volatile Thread thread;//可以理解為當(dāng)前是獨(dú)占模式還是共享模式Node nextWaiter;//如果節(jié)點(diǎn)在共享模式下等待,則返回true。final boolean isShared() {return nextWaiter == SHARED;}//獲取前一個(gè)節(jié)點(diǎn)final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}...}1.2、入隊(duì)
如果當(dāng)前線程通過CAS獲取鎖失敗,AQS會(huì)將該線程以及等待狀態(tài)等信息打包成一個(gè)Node節(jié)點(diǎn),并將其加入同步隊(duì)列的尾部,同時(shí)將當(dāng)前線程掛起。總體流程圖如下。
獲取鎖失敗并添加節(jié)點(diǎn)到同步隊(duì)列尾部的操作1.3、出隊(duì)
當(dāng)釋放鎖時(shí),執(zhí)行出隊(duì)操作及喚醒后繼節(jié)點(diǎn)。總體流程圖如下。
釋放鎖時(shí)的移除節(jié)點(diǎn)操作?
1.4、同步狀態(tài)管理
或許對(duì)圖中的同步器有所疑惑。它到底是什么?其實(shí)很簡單,它就是給首尾兩個(gè)節(jié)點(diǎn)加上volatile同步域,如下。
private transient volatile Node head; private transient volatile Node tail; private volatile int state;上面三個(gè)變量是AQS中非常重要的三個(gè)變量,前面兩個(gè)變量好理解,下面就來說一下state變量,該變量是一個(gè)計(jì)數(shù)器,在獨(dú)占鎖情況下,獲取鎖后,state的值就會(huì)為1,釋放鎖時(shí)就設(shè)置為0(這種鎖屬于不可重入鎖);在共享鎖情況下,每一個(gè)線程獲取到鎖,就會(huì)state++,釋放鎖時(shí)就state–;在可重入鎖情況下(獲取的鎖都是同一把鎖),每獲取一次鎖會(huì)state++,釋放鎖時(shí)state–。
protected final int getState() {return state; } protected final void setState(int newState) {state = newState; } //使用CAS protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update); }二、自定義獨(dú)占鎖及共享鎖
通過上一節(jié)的講解,想必對(duì)AQS有了一定的了解,下面就通過AQS來實(shí)現(xiàn)一個(gè)獨(dú)占鎖及共享鎖。AQS非常強(qiáng)大,只需要重寫tryAcquire、tryRelease這兩個(gè)方法就可以實(shí)現(xiàn)一個(gè)獨(dú)占鎖。源碼如下:
public class SingleLock implements Lock {//自定義的獨(dú)占鎖static class Sync extends AbstractQueuedSynchronizer {//獨(dú)占鎖@Overrideprotected boolean tryAcquire(int arg) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//獨(dú)占鎖@Overrideprotected boolean tryRelease(int arg) {setExclusiveOwnerThread(null);setState(0);return true;}//判斷是是否是獨(dú)占鎖。@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}Condition newCondition() {return new ConditionObject();}}private Sync sync;public SingleLock() {sync = new Sync();}//加鎖@Overridepublic void lock() {sync.acquire(1);}//獲取可中斷鎖@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}//獲取鎖,可能失敗@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}//在time時(shí)間內(nèi)不能獲取鎖則失敗@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}//釋放鎖@Overridepublic void unlock() {sync.release(1);}//Condition來實(shí)現(xiàn)阻塞喚醒機(jī)制@Overridepublic Condition newCondition() {return sync.newCondition();} }很簡單的代碼就實(shí)現(xiàn)了一個(gè)獨(dú)占鎖,SingleLock擁有ReentrantLock的大部分功能,并且用法一模一樣。是不是很簡單…JUC包中提供的閉鎖(CountDownLatch)及信號(hào)量(Semaphore)就是典型的共享鎖的實(shí)現(xiàn)。共享鎖的實(shí)現(xiàn)也很簡單,需要重寫tryAcquireShared、tryReleaseShared這兩個(gè)方法。下面就來實(shí)現(xiàn)一個(gè)共享鎖。代碼如下:
public class ShareLock implements Lock {static class Sync extends AbstractQueuedSynchronizer {private int count;Sync(int count) {this.count = count;}@Overrideprotected int tryAcquireShared(int arg) {for (; ; ) {int current = getState();int newCount = current - arg;if (newCount < 0 || compareAndSetState(current, newCount)) {return newCount;}}}@Overrideprotected boolean tryReleaseShared(int arg) {for (; ; ) {int current = getState();int newCount = current + arg;if (compareAndSetState(current, newCount)) {return true;}}}Condition newCondition() {return new ConditionObject();}}private int count;private Sync sync;public ShareLock(int count) {this.count = count;sync = new Sync(count);}@Overridepublic void lock() {sync.acquireShared(1);}@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireSharedInterruptibly(1);}@Overridepublic boolean tryLock() {return sync.tryAcquireShared(1) >= 0;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(time));}@Overridepublic void unlock() {sync.releaseShared(1);}@Overridepublic Condition newCondition() {return sync.newCondition();} }ShareLock允許count個(gè)線程同時(shí)獲取鎖,它的實(shí)現(xiàn)也很簡單吧。通過上面這兩個(gè)例子,我們就可以按照自己需求來實(shí)現(xiàn)不同的鎖,但JUC包中提供的類基本上能滿足絕大部分需求了。
三、鎖的可重入性
SingleLock是我們自己實(shí)現(xiàn)的一種獨(dú)占鎖,但如果把它用在遞歸中,就會(huì)產(chǎn)生死鎖。因?yàn)镾ingleLock不具備可重入性。那么該如何實(shí)現(xiàn)可重入性尼?來看ReentrantLock的實(shí)現(xiàn)。
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//可重入性的實(shí)現(xiàn),如果當(dāng)前線程已拿到鎖else if (current == getExclusiveOwnerThread()) {//狀態(tài)加1int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");//重新設(shè)置狀態(tài)setState(nextc);return true;}return false;}可以發(fā)現(xiàn)可重入性的實(shí)現(xiàn)還是蠻簡單的,首先判斷當(dāng)前線程是不是已經(jīng)拿到鎖,如果已經(jīng)拿到鎖就將state的值加1。可重入性這一點(diǎn)非常重要,否則會(huì)產(chǎn)生不必要的死鎖問題,Synchronize也具備可重入性。
四、鎖的公平性
SingleLock屬于一個(gè)非公平鎖,那么如何實(shí)現(xiàn)公平鎖尼?其實(shí)這更簡單,只需要加個(gè)判斷即可。來看ReentrantLock的公平鎖的實(shí)現(xiàn)。
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {//如果當(dāng)前線程之前還有節(jié)點(diǎn)則hasQueuedPredecessors返回true,就不會(huì)去競(jìng)爭鎖if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}hasQueuedPredecessors就是判斷鎖是否公平的關(guān)鍵,如果在當(dāng)前線程之前還有排隊(duì)的線程就返回true,這時(shí)候當(dāng)前線程就不會(huì)去競(jìng)爭鎖。從而保證了鎖的公平性。
五、驚群效應(yīng)
在使用wait/notify/notifyAll時(shí),喚醒線程都是使用notifyAll來喚醒線程,因?yàn)閚otify無法喚醒指定線程,從而可能導(dǎo)致死鎖。但使用notifyAll也有一個(gè)問題,那就是當(dāng)大量線程來獲取鎖時(shí),就會(huì)產(chǎn)生驚群效應(yīng),大量的競(jìng)爭必然造成資源的劇增和浪費(fèi),因此終究只能有一個(gè)線程競(jìng)爭成功,其他線程還是要老老實(shí)實(shí)的回去等待。而AQS的FIFO的等待隊(duì)列給解決在鎖競(jìng)爭方面的驚群效應(yīng)問題提供了一個(gè)思路:保持一個(gè)FIFO隊(duì)列,隊(duì)列每個(gè)節(jié)點(diǎn)只關(guān)心其前一個(gè)節(jié)點(diǎn)的狀態(tài),線程喚醒也只喚醒隊(duì)頭等待線程。
【參考資料】
深入學(xué)習(xí)java同步器AQS
淺談Java并發(fā)編程系列(九)—— AQS結(jié)構(gòu)及原理分析
Java并發(fā)編程之AQS
扒一扒ReentrantLock以及AQS實(shí)現(xiàn)
總結(jié)
以上是生活随笔為你收集整理的Java并发编程—AQS原理分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 并发编程—有锁互斥机制及AQS
- 下一篇: Java并发编程—锁的基本概念