聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)
AQS是AbstractQueuedSynchronizer的縮寫,AQS是Java并包里大部分同步器的基礎(chǔ)構(gòu)件,利用AQS可以很方便的創(chuàng)建鎖和同步器。它封裝了一個(gè)狀態(tài),提供了一系列的獲取和釋放操作,這些獲取和釋放操作都是基于狀態(tài)的。它的基本思想是由AQS負(fù)責(zé)管理同步器類中的狀態(tài),其他的同步器比如可重入鎖ReentrantLock, 信號(hào)量Semaphore基于各自的特點(diǎn)來調(diào)用AQS提供了基礎(chǔ)能力進(jìn)行狀態(tài)的同步。
?
在AQS的Javadoc里面提到它是CLHLock的變種,在聊聊高并發(fā)(八)實(shí)現(xiàn)幾種自旋鎖(三)?這篇文章中我們說了如何利用CLH鎖來構(gòu)件自旋鎖,回顧一下CLHLock的一些基本特點(diǎn):
1. CLHLock是一種隊(duì)列自旋鎖的實(shí)現(xiàn),提供了FIFO先來先服務(wù)的公平性
2. 利用一個(gè)原子變量AtomicReference tail的CAS操作來構(gòu)件一個(gè)虛擬的鏈?zhǔn)浇Y(jié)構(gòu)
3. 節(jié)點(diǎn)Node維護(hù)一個(gè)volatile狀態(tài),維護(hù)一個(gè)prev指針指向前一個(gè)節(jié)點(diǎn),獲取鎖時(shí)每個(gè)線程在prev節(jié)點(diǎn)的狀態(tài)上自旋
4. 當(dāng)線程釋放鎖時(shí),只需要修改自身狀態(tài)即可,后續(xù)節(jié)點(diǎn)會(huì)觀察到volatile狀態(tài)的改動(dòng)而獲取鎖
?
AQS既然是CLHLock的一種變種,那么
1. 也維護(hù)以了一個(gè)基本的隊(duì)列結(jié)構(gòu)
2. 也是提供了一個(gè)Tail指針從隊(duì)尾通過CAS操作入隊(duì)列。
3. 提供了一個(gè)volatile類型的int值來維護(hù)狀態(tài)
?
?public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
?private transient volatile Node head;
??? private transient volatile Node tail;
??? private volatile int state;
??? protected final int getState() {
??????? return state;
??? }
??
??? protected final void setState(int newState) {
??????? state = newState;
??? }
??? protected final boolean compareAndSetState(int expect, int update) {
??????? // See below for intrinsics setup to support this
??????? return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
??? }
..................
}
?
與標(biāo)準(zhǔn)CLHLock實(shí)現(xiàn)不同的是,AQS不是一個(gè)自旋鎖,它提供了更加豐富的語意:
1. 提供了獨(dú)享(exclusive)方式和共享(share)方式來獲取/釋放,比如鎖是獨(dú)占方式的,信號(hào)量semaphore是共享方式的,可以有多個(gè)線程進(jìn)入臨界區(qū)?
2. 支持可中斷和不可中斷的獲取/釋放
3. 支持普通的和具有時(shí)間限制的獲取/釋放
4. 提供了自旋和阻塞的切換,可以先自旋,如果等待時(shí)間長,可以阻塞
?
?/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
AQS定義了兩個(gè)內(nèi)部類來輔助它的實(shí)現(xiàn),一個(gè)是Node定義了隊(duì)列中的節(jié)點(diǎn),另一個(gè)是ConditionObject,是Condition接口的實(shí)現(xiàn)類,負(fù)責(zé)管理?xiàng)l件隊(duì)列。關(guān)于條件隊(duì)列更多內(nèi)容可以看這篇?聊聊高并發(fā)(十四)理解Java中的管程,條件隊(duì)列,Condition以及實(shí)現(xiàn)一個(gè)阻塞隊(duì)列
?
先看下Node類,它比CLHLock中的Node有更多屬性,除了完成基本的隊(duì)列功能,還維護(hù)了是獨(dú)享還是共享的模式信息
1. 維護(hù)了一個(gè)Node SHARED引用表示共享模式
2. 維護(hù)了一個(gè)Node EXCLUSIVE引用表示獨(dú)占模式
3. 維護(hù)了幾種節(jié)點(diǎn)等待的狀態(tài) waitStatus, 其中CANCELLED = 1是正數(shù),表示取消狀態(tài),SIGNAL = -1,CONDITION = -2, PROPAGATE = -3都是負(fù)數(shù),表示節(jié)點(diǎn)在條件隊(duì)列的某個(gè)狀態(tài),SIGNAL表示后續(xù)節(jié)點(diǎn)需要被喚醒
4. 維護(hù)了Node prev引用,指向隊(duì)列中的前一個(gè)節(jié)點(diǎn),通過Tail的CAS操作來創(chuàng)建
5. 維護(hù)了Node next引用,指向隊(duì)列中的下一個(gè)節(jié)點(diǎn),也是在通過Tail入隊(duì)列的時(shí)候設(shè)置的,這樣就維護(hù)了一個(gè)雙向隊(duì)列
6. 維護(hù)了一個(gè)volatile的Thread引用,把一個(gè)節(jié)點(diǎn)關(guān)聯(lián)到一個(gè)線程
7. 維護(hù)了Node nextWaiter引用,指向在條件隊(duì)列中的下一個(gè)正在等待的節(jié)點(diǎn),是給條件隊(duì)列使用的。值得注意的是條件隊(duì)列只有在獨(dú)享狀態(tài)下才使用
?
?static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
?
再看一下ConditionObject,它是條件Condition接口的具體實(shí)現(xiàn),維護(hù)了一個(gè)條件隊(duì)列,條件隊(duì)列是通過Node來構(gòu)件的一個(gè)單向鏈表結(jié)構(gòu)。底層的條件操作(等待和喚醒)使用LockSupport類來實(shí)現(xiàn),在這篇中我們說了LockSupport底層使用sun.misc.Unsafe來提供條件隊(duì)列的park和unpark操作。聊聊高并發(fā)(十七)解析java.util.concurrent各個(gè)組件(一) 了解sun.misc.Unsafe類
1. 維護(hù)了一個(gè)Node firstWaiter引用指向條件隊(duì)列的隊(duì)首節(jié)點(diǎn)
2. 維護(hù)了一個(gè)Node lastWaiter引用指向條件隊(duì)列的隊(duì)尾節(jié)點(diǎn)
3. 條件隊(duì)列支持節(jié)點(diǎn)的取消退出機(jī)制,CANCELLED節(jié)點(diǎn)來表示這種取消狀態(tài)
4. 支持限時(shí)等待機(jī)制
5. 支持可中斷和不可中斷的等待
我們來看幾個(gè)典型的條件隊(duì)列的操作實(shí)現(xiàn)
往條件隊(duì)列里面加入一個(gè)等待節(jié)點(diǎn),這個(gè)是await()方法的基本操作
1. 判斷尾節(jié)點(diǎn)的狀態(tài)是不是等待某個(gè)條件的狀態(tài)(CONDITION),如果不是,就把CANCELLED節(jié)點(diǎn)從隊(duì)列中踢出,然后把自己標(biāo)記為尾節(jié)點(diǎn)
?
?public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
???????? * Adds a new waiter to wait queue.
???????? * @return its new wait node
???????? */
??????? private Node addConditionWaiter() {
??????????? Node t = lastWaiter;
??????????? // If lastWaiter is cancelled, clean out.
??????????? if (t != null && t.waitStatus != Node.CONDITION) {
??????????????? unlinkCancelledWaiters();
??????????????? t = lastWaiter;
??????????? }
??????????? Node node = new Node(Thread.currentThread(), Node.CONDITION);
??????????? if (t == null)
??????????????? firstWaiter = node;
??????????? else
??????????????? t.nextWaiter = node;
??????????? lastWaiter = node;
??????????? return node;
??????? }
private void unlinkCancelledWaiters() {
??????????? Node t = firstWaiter;
??????????? Node trail = null;
??????????? while (t != null) {
??????????????? Node next = t.nextWaiter;
??????????????? if (t.waitStatus != Node.CONDITION) {
??????????????????? t.nextWaiter = null;
??????????????????? if (trail == null)
??????????????????????? firstWaiter = next;
??????????????????? else
??????????????????????? trail.nextWaiter = next;
??????????????????? if (next == null)
??????????????????????? lastWaiter = trail;
??????????????? }
??????????????? else
??????????????????? trail = t;
??????????????? t = next;
??????????? }
??????? }
.................
}
從條件隊(duì)列中喚醒一個(gè)節(jié)點(diǎn),實(shí)際上doSignal只是把一個(gè)節(jié)點(diǎn)從條件隊(duì)列中移除,然后加入到同步隊(duì)列,并設(shè)置它在同步隊(duì)列的前置節(jié)點(diǎn)的waitStatus = SIGNAL, 如果設(shè)置失敗或者取消在條件隊(duì)列等待,直接把這個(gè)節(jié)點(diǎn)的線程unpark喚醒,需要注意的是unpark操作只是把線程從等待狀態(tài)轉(zhuǎn)化為可運(yùn)行狀態(tài),并不直接獲得鎖。
?
??public final void signal() {
??????????? if (!isHeldExclusively())
??????????????? throw new IllegalMonitorStateException();
??????????? Node first = firstWaiter;
??????????? if (first != null)
??????????????? doSignal(first);
??????? }
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
? ?final boolean transferForSignal(Node node) {
??????? /*
???????? * If cannot change waitStatus, the node has been cancelled.
???????? */
??????? if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
??????????? return false;
??????? /*
???????? * Splice onto queue and try to set waitStatus of predecessor to
???????? * indicate that thread is (probably) waiting. If cancelled or
???????? * attempt to set waitStatus fails, wake up to resync (in which
???????? * case the waitStatus can be transiently and harmlessly wrong).
???????? */
??????? Node p = enq(node);
??????? int ws = p.waitStatus;
??????? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
??????????? LockSupport.unpark(node.thread);
??????? return true;
??? }
Java線程的幾種狀態(tài)如下
?
支持中斷的等待操作,?主要做了兩個(gè)事情:新建一個(gè)Node進(jìn)入條件隊(duì)列等待被喚醒;從同步隊(duì)列中移除并釋放鎖。它會(huì)相應(yīng)線程的中斷拋出中斷異常,并且記錄中斷狀態(tài)
?
?public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
不可中斷的等待,也是先進(jìn)入條件隊(duì)列等待,并從同步隊(duì)列出隊(duì)列,釋放鎖。但是它不相應(yīng)線程中斷狀態(tài)
?
?public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
限時(shí)等待,也是先進(jìn)入條件隊(duì)列等待,然后釋放鎖。輪詢等待時(shí)間,當(dāng)超時(shí)后再次進(jìn)入同步隊(duì)列,等待獲得鎖。如果獲得了鎖,就返回false. 如果在等待時(shí)被喚醒,就進(jìn)入同步隊(duì)列,等待獲得鎖,如果獲得鎖就返回true
?
?public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
if (unit == null)
throw new NullPointerException();
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
AQS使用了Unsafe直接操作內(nèi)存來對(duì)字段進(jìn)行CAS操作和設(shè)置值。
?
?private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
總結(jié)
以上是生活随笔為你收集整理的聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊高并发(二十)解析java.util
- 下一篇: 聊聊高并发(二十二)解析java.uti