日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ConditionObject源码

發布時間:2024/4/13 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ConditionObject源码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

介紹

Condition是在JDK1.5中才出現的,它可以替代傳統的Object中的wait()、notify()和notifyAll()方法來實現線程間的通信,使線程間協作更加安全和高效。

Condition是一個接口,它的定義如下:

public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll(); }

AQS的ConditionObject類實現了Condition接口

實現原理

ConditionObject類內部由條件隊列存儲每個需要等待的線程。條件隊列必須與一個獨占模式的鎖綁定,在執行await,signal之前必須先acquire到鎖。

條件隊列是一個FIFO隊列,是一個單向鏈表,包含firstWaiter,lastWaiter 2個指示節點,firstWaiter指向第一等待的節點,該節點是綁定線程的,與AQS的等待隊列的head節點不同。一個節點當被signal后,會由條件隊列轉移到等待隊列中

源碼分析

源碼

await

//await方法是能夠響應中斷的。 public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 添加節點到Condition隊列中Node node = addConditionWaiter(); //【1】// 釋放當前線程的lock(要把控制權釋放出去,由其他線程獲取到鎖),從AQS的隊列中移出//此處savedState 返回的是線程獲取到的資源數據(比如ReentranceLock支持多次lock)int savedState = fullyRelease(node); //【2】int interruptMode = 0;// 循環判斷當前線程的Node是否在Sync隊列中(被singal之后,會把節點移動到sync隊列),如果不在(不在說明未被signal),則parkwhile (!isOnSyncQueue(node)) {LockSupport.park(this); //如果發生中斷,會理解從park返回。 //【3】// checkInterruptWhileWaiting方法根據中斷發生的時機返回后續需要處理這次中斷的方式, 如果發生中斷,退出循環if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// acquireQueued需要再次獲取到savedState 對應的資源(即釋放多少再獲取多少)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 從頭到尾遍歷Condition隊列,移除被cancel的節點if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 如果線程已經被中斷,則根據之前獲取的interruptMode的值來判斷是繼續中斷還是拋出異常if (interruptMode != 0)reportInterruptAfterWait(interruptMode); }

await方法首先(調用addConditionWaiter)根據當前線程創建了一個Node(waitStauts為CONDITION),然后釋放當前線程的獨占鎖。這里的savedState表示當前線程已經加鎖的次數(ReentrantLock為重入鎖)。while循環其實就是一直判斷,當前的線程是否又被添加到了Sync隊列中,如果已經在Sync隊列中,則退出循環。調用signal方法的時候,因為這里需要喚醒之前調用await方法的線程,所以會把當前線程又加入到Sync隊列中。

final int fullyRelease(Node node) {boolean failed = true;try {//占用了多少資源(這次釋放多少,下次acquire時仍要獲取多少)int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;} } /* 該方法判斷當前線程的node是否在Sync隊列中。 */ final boolean isOnSyncQueue(Node node) {//如果當前線程node的狀態是CONDITION或者node.prev為null時說明已經在Condition隊列中了,所以返回false;if (node.waitStatus == Node.CONDITION || node.prev == null)return false;//如果node.next不為null,說明在Sync隊列中,返回true;if (node.next != null) // If has successor, it must be on queuereturn true;/**如果兩個if都未返回時,可以斷定node的prev一定不為null,next一定為null(因為node為lastWaiter),*這個時候可以認為node正處于放入Sync隊列的執行CAS操作執行過程中(enq 函數調用中)。*而這個CAS操作有可能失敗,所以通過findNodeFromTail再嘗試一次判斷。*/return findNodeFromTail(node); }private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;} }private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;// 執行findNodeFromTail方法時可能一直在此自旋if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }

中斷處理:

/* 判斷等待過程中是否發生中斷 */ private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0; }/* 判斷中斷的時候,是否有signal方法的調用 */ final boolean transferAfterCancelledWait(Node node) {/*CAS成功,說明signal還未執行,因為signal之后,會把waitStatus修改為SIGNAL。*/if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//未signal,則把節點加入到sync隊列中去。enq(node);//返回true。表示未發生中斷。return true;}/**CAS失敗了,則不能判斷當前線程是先進行了中斷還是先進行了signal方法的調用,可能是先執行了*signal然后中斷,也可能是先執行了中斷,后執行了signal,當然,這兩個操作肯定是發生在CAS之*前。這時需要做的就是等待當前線程的node被添加到Sync隊列后,也就是enq方法返回后,返回false*告訴checkInterruptWhileWaiting方法返回REINTERRUPT,后續進行重新中斷。*/while (!isOnSyncQueue(node))Thread.yield();return false; }

signal

public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}

不管是signal,還是signalAll,都需要先取得lock。

/* 喚醒第一個可以被喚醒的節點。 */ private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null); //【4】 }//釋放條件隊列,把每個節點都轉化為sync節點 private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null); }


?

/* 把節點移動到sync隊列,并且嘗試把節點狀態改為0,把前置節點狀態改為SIGNAL */ final boolean transferForSignal(Node node) {/** CAS失敗,說明狀態為CANCELLED,*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //【5】return false;/** CAS成功,把節點加入到sync隊列。*/Node p = enq(node);//返回的是node前置節點。 //【6】int ws = p.waitStatus;/*ws>0表明,節點CANCLEED。如果為0,需要把前置節點設置為SIGNAL。設置失敗則park如果p節點的線程在這時執行了unlock方法,就會調用unparkSuccessor方法,unparkSuccessor方法可 能就將p的狀態改為了0,那么執行就會失敗。*/if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //【7】LockSupport.unpark(node.thread);return true; }

?

示例

public class ConTest2 { private int queueSize = 5; private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public static void main(String[] args) throws InterruptedException { ConTest2 test = new ConTest2(); Producer producer1 = test.new Producer(); Consumer consumer1 = test.new Consumer(); Consumer consumer2 = test.new Consumer(); producer1.start(); Thread.sleep(1000);consumer1.start(); consumer2.start();} class Consumer extends Thread{ @Override public void run() { consume(); } volatile boolean flag=true; private void consume() { while(flag){ lock.lock(); try { while(queue.size() == 0){ try { System.out.println("隊列空,等待數據"); notEmpty.await(); } catch (InterruptedException e) { flag =false; } } queue.poll(); //每次移走隊首元素 notFull.signal(); System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素"); } finally{ lock.unlock(); } } } } class Producer extends Thread{ @Override public void run() { produce(); } volatile boolean flag=true; private void produce() { while(flag){ lock.lock(); try { while(queue.size() == queueSize){ try { System.out.println("隊列滿,等待有空余空間"); notFull.await(); } catch (InterruptedException e) { flag =false; } } queue.offer(1); //每次插入一個元素 notEmpty.signal(); System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size())); } finally{ lock.unlock(); } } } } }

以上示例為典型生產者-消費者模式,p1,最先獲取到lock。

AQS圖例

1、P1,C1,C2線程開始,假設P1先獲取到lock,并且把隊列寫滿,C1,C2才啟動。此時C1,C2線程獲取lock失敗,加入同步隊列中。

2、P1執行,發現隊列滿,執行notFull.await()方法,經過【1】,【2】在【3】處park,隊列狀態如下:

此時,C1線程被unpark,P1線程在條件隊列notFull中。

2、C1執行 lock方法成功,然后移出一個元素,調用notFull.signal(),經過【4】,【5】,【6】之后,P1關聯節點由條件隊列轉移到同步隊列

?

3、C1釋放鎖,假設C1一直可以獲取到鎖,再釋放鎖,這樣P1,C2一直會在sync隊列中。一直到C1把隊列消費完,調用notEmpty.await(),C2獲取到鎖,但是由于隊列為空,C2也調用notEmpty.await(),此時P1獲取到鎖。狀態如下:

4、當P1調用notEmpty.signalAll()時,把C1,C2對應節點再加入到sync隊列

?

隊列節點的狀態

? 調用條件隊列的等待操作,會設置節點的waitingStatus為Condition,標識當前節點正處于條件隊列中。條件隊列的節點狀態轉換圖如下:

???????? Node的各個狀態的主要作用:Cancelled主要是解決線程在持有鎖時被外部中斷的邏輯,AQS的可中斷鎖獲取方法lockInterrutible()是基于該狀態實現的。顯式鎖必須手動釋放鎖,尤其是有中斷的環境中,一個線程被中斷可能仍然持有鎖,所以必須注意在finally中unlock。Condition則是支持條件隊列的等待操作,是Lock與條件隊列關聯的基礎。Signal是阻塞后繼線程的標識,一個等待線程只有在其前驅節點的狀態是SIGNAL時才會被阻塞,否則一直執行自旋嘗試操作,以減少線程調度的開銷。

條件隊列上的等待和喚醒操作,本質上是節點在AQS線程等待隊列和條件隊列之間相互轉移的過程,當需要等待某個條件時,線程會將當前節點添加到條件隊列中,并釋放持有鎖;當某個線程執行條件隊列的喚醒操作,則會將條件隊列的節點轉移到AQS等待隊列。每個Condition就是一個條件隊列,可以通過Lock的newCondition創建多個等待條件。AQS的條件隊列,它的等待和喚起操作流程如下:

?

?

await與awaitUninterruptibly()比較

?await()方法是可中斷方法,如果有中斷拋出中斷異常。

awaitUninterruptibly(),如果有中斷,僅重新設置中斷狀態。

await方法的幾種實現

與Object類中wait,notify比較

Condition與Object類中的方法對應如下:

ObjectCondition
wait()await()
notify()signal()
notifyAll()signalAll()

Condition.await()與Object.wait(),都必須先獲取到鎖,才可以執行,執行后釋放鎖

不同的是Condition與Lock結合,wait與synchronized結合。

多線程環境的下,線程直接的互斥[執行]依靠的應該是鎖Lock,線程的之間的[通信]依靠的應該是條件Condition/信號,一般情況下lock確實可以同時滿足做這兩個事情,所以在Object的方式滿足了這個一般情況,但是肯定會有復雜的場景比如剛才例子中,需要讓滿足一定條件的線程執行,僅僅依靠鎖是不能完美解決的。所以condition實際上分離了執行和通信

?

?

?

?

?

總結

以上是生活随笔為你收集整理的ConditionObject源码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。