AQS中的Condition是什么?
歡迎關注:王有志
期待你加入Java人的提桶跑路群:共同富裕的Java人
今天來和大家聊聊Condition,Condition為AQS“家族”提供了等待與喚醒的能力,使AQS"家族"具備了像synchronized一樣暫停與喚醒線程的能力。我們先來看兩道關于Condition的面試題目:
- Condition和Object的等待與喚醒有什么區別?
- 什么是Condition隊列?
接下來,我們就按照“是什么”,“怎么用”和“如何實現”的順序來揭開Condition的面紗吧。
Condition是什么?
Condition是Java中的接口,提供了與Object#wait和Object#notify相同的功能。Doug Lea在Condition接口的描述中提到了這點:
Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to “wait”) until notified by another thread that some state condition may now be true.
來看Condition接口中提供了哪些方法:
public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll(); }Condition只提供了兩個功能:等待(await)和喚醒(signal),與Object提供的等待與喚醒時相似的:
public final void wait() throws InterruptedException;public final void wait(long timeoutMillis, int nanos) throws InterruptedException;public final native void wait(long timeoutMillis) throws InterruptedException;@HotSpotIntrinsicCandidate public final native void notify();@HotSpotIntrinsicCandidate public final native void notifyAll();喚醒功能上,Condition與Object的差異并不大:
- Condition#signal ≈ \approx ≈ Object#notify
- Condition#signalAll = = = Object#notifyAll
多個線程處于等待狀態時,Object#notify()是“隨機”喚醒線程,而Condition#signal則由具體實現決定如何喚醒線程,如:ConditionObject喚醒的是最早進入等待的線程,但兩個方法均只喚醒一個線程。
等待功能上,Condition與Object的共同點是:都會釋放持有的資源,Condition釋放鎖,Object釋放Monitor,即進入等待狀態后允許其他線程獲取鎖/監視器。主要的差異體現在Condition支持了更加豐富的場景,通過一張表格來對比下:
| Condition#await() | Object#wait() | 暫停線程,拋出線程中斷異常 |
| Condition#awaitUninterruptibly() | / | 暫停線程,不拋出線程中斷異常 |
| Condition#await(time, unit) | Object#wait(timeoutMillis, nanos) | 暫停線程,直到被喚醒或等待指定時間后,超時后自動喚醒返回false,否則返回true |
| Condition#awaitUntil(deadline) | / | 暫停線程,直到被喚醒或到達指定時間點,超時后自動喚醒返回false,否則返回true |
| Condition#awaitNanos(nanosTimeout) | / | 暫停線程,直到被喚醒或等待指定時間后,返回值表示被喚醒時的剩余時間(nanosTimeout-耗時),結果為負數表示超時 |
除了以上差異外,Condition還支持創建多個等待隊列,即同一把鎖擁有多個等待隊列,線程在不同隊列中等待,而Object只有一個等待隊列。《Java并發編程的藝術》中也有一張類似的表格,放在這里供大家參考:
Tips:
- 實際上signal翻譯為喚醒并不恰當~~
- 涉及到Condition的實現部分,下文通過AQS中的ConditionObject詳細解釋。
Condition怎么用?
既然Condition與Object提供的等待與喚醒功能相同,那么它們的用法是不是也很相似呢?
與調用Object#wait和Object#notifyAll必須處于synchronized修飾的代碼中一樣(獲取Monitor),調用Condition#await和Condition#signalAll的前提是要先獲取鎖。但不同的是,使用Condition前,需要先通過鎖去創建Condition。
以ReentrantLock中提供的Condition為例,首先是創建Condition對象:
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition();然后是獲取鎖并調用await方法:
new Thread(() -> {lock.lock();try {condition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}lock.unlock(); }最后,通過調用singalAll喚醒全部阻塞中的線程:
new Thread(() -> {lock.lock();condition.signalAll();lock.unlock(); }ConditionObject的源碼分析
作為接口Condition非常慘,因為在Java中只有AQS中的內部類ConditionObject實現了Condition接口:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {public class ConditionObject implements Condition, java.io.Serializable {private transient Node firstWaiter;private transient Node lastWaiter;}static final class Node {// 省略} }ConditionObject只有兩個Node類型的字段,分別是鏈式結構中的頭尾節點,ConditionObject就是通過它們實現的等待隊列。那么ConditionObject的等待隊列起到了怎樣的作用呢?是類似于AQS中的排隊機制嗎?帶著這兩個問題,我們正是開始源碼的分析。
await方法的實現
Condition接口中定義了4個線程等待的方法:
- void await() throws InterruptedException
- void awaitUninterruptibly();
- long awaitNanos(long nanosTimeout) throws InterruptedException;
- boolean await(long time, TimeUnit unit) throws InterruptedException;
- boolean awaitUntil(Date deadline) throws InterruptedException;
方法雖然很多,但它們之間的差異較小,只體現在時間的處理上,我們看其中最常用的方法:
public final void await() throws InterruptedException {// 線程中斷,拋出異常if (Thread.interrupted()) {throw new InterruptedException();}// 注釋1:加入到Condition的等待隊列中Node node = addConditionWaiter();// 注釋2:釋放持有鎖(調用AQS的release)int savedState = fullyRelease(node);int interruptMode = 0;// 注釋3:判斷是否在AQS的等待隊列中while (!isOnSyncQueue(node)) {LockSupport.park(this);// 中斷時退出方法if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {break;}}// 加入到AQS的等待隊列中,調用AQS的acquireQueued方法if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {interruptMode = REINTERRUPT;}// 斷開與Condition隊列的聯系if (node.nextWaiter != null) {unlinkCancelledWaiters();}if (interruptMode != 0) {reportInterruptAfterWait(interruptMode);} }注釋1的部分,調用addConditionWaiter方法添加到Condition隊列中:
private Node addConditionWaiter() {// 判斷當前線程是否為持有鎖的線程if (!isHeldExclusively()) {throw new IllegalMonitorStateException();}// 獲取Condition隊列的尾節點Node t = lastWaiter;// 斷開不再位于Condition隊列的節點if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 創建Node.CONDITION模式的Node節點Node node = new Node(Node.CONDITION);if (t == null) {// 隊列為空的場景,將node設置為頭節點firstWaiter = node;} else {// 隊列不為空的場景,將node添加到尾節點的后繼節點上t.nextWaiter = node;}// 更新尾節點lastWaiter = node;return node; }可以看到,Condition的隊列是一個樸實無華的雙向鏈表,每次調用addConditionWaiter方法,都會加入到Condition隊列的尾部。
注釋2的部分,釋放線程持有的鎖,同時移出AQS的隊列,內部調用了AQS的release方法:
=final int fullyRelease(Node node) {try {int savedState = getState();if (release(savedState)) {return savedState;}throw new IllegalMonitorStateException();} catch (Throwable t) {node.waitStatus = Node.CANCELLED;throw t;} }因為已經分析過AQS的release方法和ReentrantLock實現的tryRelease方法,這里我們就不過多贅述了。
注釋3的部分,isOnSyncQueue判斷當前線程是否在AQS的等待隊列中,我們來看此時存在的情況:
- 如果isOnSyncQueue返回false,即線程不在AQS的隊列中,進入自旋,調用LockSupport#park暫停線程;
- 如果isOnSyncQueue返回true,即線程在AQS的隊列中,不進入自旋,執行后續邏輯。
結合注釋1和注釋2的部分,Condition#await的實現原理了就很清晰了:
- Condition與AQS分別維護了一個等待隊列,而且是互斥的,即同一個節點只會出現在一個隊列中;
- 當調用Condition#await時,將線程添加到Condition的隊列中(注釋1),同時從AQS隊列中移出(注釋2);
- 接著判斷線程位于的隊列:
- 位于Condition隊列中,該線程需要被暫停,調用LockSupport#park;
- 位于AQS隊列中,該線程正在等待獲取鎖。
基于以上的結論,我們已經能夠猜到喚醒方法Condition#signalAll的原理了:
- 將線程從Condition隊列中移出,并添加到AQS的隊列中;
- 調用LockSupport.unpark喚醒線程。
至于這個猜想是否正確,我們接著來看喚醒方法的實現。
Tips:如果忘記了AQS中相關方法是如何實現的,可以回顧下《AQS的今生,構建出JUC的基礎》。
signal和signalAll方法的實現
來看signal和signalAll的源碼:
// 喚醒一個處于等待中的線程 public final void signal() {if (!isHeldExclusively()) {throw new IllegalMonitorStateException();}// 獲取Condition隊列中的第一個節點Node first = firstWaiter;if (first != null) {// 喚醒第一個節點doSignal(first);} }// 喚醒全部處于等待中的線程 public final void signalAll() {if (!isHeldExclusively()){throw new IllegalMonitorStateException();}Node first = firstWaiter;if (first != null) {// 喚醒所有節點doSignalAll(first);} }兩個方法唯一的差別在于頭節點不為空的場景下,是調用doSignal喚醒一個線程還是調用doSignalAll喚醒所有線程:
private void doSignal(Node first) {do {// 更新頭節點if ( (firstWaiter = first.nextWaiter) == null) {// 無后繼節點的場景lastWaiter = null;}// 斷開節點的連接first.nextWaiter = null;// 喚醒頭節點} while (!transferForSignal(first) && (first = firstWaiter) != null); }private void doSignalAll(Node first) {// 將Condition的隊列置為空lastWaiter = firstWaiter = null;do {// 斷開鏈接Node next = first.nextWaiter;first.nextWaiter = null;// 喚醒當前頭節點transferForSignal(first);// 更新頭節點first = next;} while (first != null); }可以看到,無論是doSignal還是doSignalAll都只是將節點移出Condition隊列,而真正起到喚醒作用的是transferForSignal方法,從方法名可以看到該方法是通過“轉移”進行喚醒的,我們來看源碼:
final boolean transferForSignal(Node node) {// 通過CAS替換node的狀態// 如果替換失敗,說明node不處于Node.CONDITION狀態,不需要喚醒if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {return false;}// 將節點添加到AQS的隊列的隊尾// 并返回老隊尾節點,即node的前驅節點Node p = enq(node);int ws = p.waitStatus;// 對前驅節點狀態的判斷if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {LockSupport.unpark(node.thread);}return true; }transferForSignal方法中,調用enq方法將node重新添加到AQS的隊列中,并返回node的前驅節點,隨后對前驅節點的狀態進行判斷:
- 當 w s > 0 ws > 0 ws>0時,前驅節點處于Node.CANCELLED狀態,前驅節點退出鎖的爭搶,node可以直接被喚醒;
- 當 w s ≤ 0 ws \leq 0 ws≤0時,通過CAS修改前驅節點的狀態為Node.SIGNAL,設置失敗時,直接喚醒node。
《AQS的今生,構建出JUC的基礎》中介紹了waitStatus的5種狀態,其中Node.SIGNAL狀態表示需要喚醒后繼節點。另外,在分析shouldParkAfterFailedAcquire方法的源碼時,我們知道在進入AQS的等待隊列時,需要將前驅節點的狀態更新為Node.SIGNAL。
最后來看enq的實現:
private Node enq(Node node) {for (;;) {// 獲取尾節點Node oldTail = tail;if (oldTail != null) {// 更新當前節點的前驅節點node.setPrevRelaxed(oldTail);// 更新尾節點if (compareAndSetTail(oldTail, node)) {oldTail.next = node;// 返回當前節點的前驅節點(即老尾節點)return oldTail;}} else {initializeSyncQueue();}} }enq的實現就非常簡單了,通過CAS更新AQS的隊列尾節點,相當于添加到AQS的隊列中,并返回尾節點的前驅節點。好了,喚醒方法的源碼到這里就結束了,是不是和我們當初的猜想一模一樣呢?
圖解ConditionObject原理
功能上,Condition實現了AQS版Object#wait和Object#notify,用法上也與之相似,需要先獲取鎖,即需要在lock與unlock之間調用。原理上,簡單來說就是線程在AQS的隊列和Condition的隊列之間的轉移。
線程t持有鎖
假設有線程t已經獲取了ReentrantLock,線程t1,t2和t3正在AQS的隊列中等待,我們可以得到這樣的結構:
線程t執行Condition#await
如果線程t中調用了Condition#await方法,線程t進入Condition的等待隊列中,線程t1獲取ReentrantLock,并從AQS的隊列中移出,結構如下:
線程t1執行Condition#await
如果線程t1中也執行了Condition#await方法,同樣線程t1進入Condition隊列中,線程t2獲取到ReentrantLock,結構如下:
線程t2執行Condition#signal
如果線程t2執行了Condition#signal,喚醒Condition隊列中的第一個線程,此時結構如下:
通過上面的流程,我們就可以得到線程是如何在Condition隊列與AQS隊列中轉移的:
結語
關于Condition的內容到這里就結束了,無論是理解,使用還是剖析原理,Condition的難度并不高,只不過大家可能平時用得比較少,因此多少有些陌生。
最后,截止到文章發布,我應該是把開頭兩道題目的題解寫完了吧~~
好了,今天就到這里了,Bye~~
總結
以上是生活随笔為你收集整理的AQS中的Condition是什么?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 4月雪
- 下一篇: 干程序 身体才是本钱!