java.util.concurrent包详细分析--转
原文地址:http://blog.csdn.net/windsunmoon/article/details/36903901
概述
Java.util.concurrent 包含許多線程安全、測試良好、高性能的并發構建塊。不客氣地說,創建java.util.concurrent 的目的就是要實現 Collection 框架對數據結構所執行的并發操作。通過提供一組可靠的、高性能并發構建塊,開發人員可以提高并發類的線程安全、可伸縮性、性能、可讀性和可靠性。
此包包含locks,concurrent,atomic 三個包。
Atomic:原子數據的構建。
Locks:基本的鎖的實現,最重要的AQS框架和lockSupport
Concurrent:構建的一些高級的工具,如線程池,并發隊列等。
其中都用到了CAS(compare-and-swap)操作。CAS 是一種低級別的、細粒度的技術,它允許多個線程更新一個內存位置,同時能夠檢測其他線程的沖突并進行恢復。它是許多高性能并發算法的基礎。在 JDK 5.0 之前,Java 語言中用于協調線程之間的訪問的惟一原語是同步,同步是更重量級和粗粒度的。公開 CAS 可以開發高度可伸縮的并發 Java 類。這些更改主要由 JDK 庫類使用,而不是由開發人員使用。
CAS操作都封裝在java 不公開的類庫中,sun.misc.Unsafe。此類包含了對原子操作的封裝,具體用本地代碼實現。本地的C代碼直接利用到了硬件上的原子操作。
Atomic原子數據
?這個包里面提供了一組原子變量類。其基本的特性就是在多線程環境下,當有多個線程同時執行這些類的實例包含的方法時,具有排他性,即當某個線程進入方法,執行其中的指令時,不會被其他線程打斷,而別的線程就像自旋鎖一樣,一直等到該方法執行完成,才由JVM從等待隊列中選擇一個另一個線程進入,這只是一種邏輯上的理解。實際上是借助硬件的相關指令來實現的,不會阻塞線程(或者說只是在硬件級別上阻塞了)。可以對基本數據、數組中的基本數據、對類中的基本數據進行操作。原子變量類相當于一種泛化的volatile變量,能夠支持原子的和有條件的讀-改-寫操作。
?java.util.concurrent.atomic中的類可以分成4組:
標量類(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
數組類:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新器類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
復合變量類:AtomicMarkableReference,AtomicStampedReference
標量類
第一組AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本類型用來處理布爾,整數,長整數,對象四種數據,其內部實現不是簡單的使用synchronized,而是一個更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷,執行效率大為提升。
他們的實現都是依靠 真正的值為volatile 類型,通過Unsafe 包中的原子操作實現。最基礎就是CAS,他是一切的基礎。如下 。其中offset是 在內存中 value相對于基地址的偏移量。(它的獲得也由Unsafe 本地代碼獲得)。關于加鎖的原理見附錄。
核心代碼如下,其他都是在compareAndSet基礎上構建的。
1.?private?static?final?Unsafe?unsafe?=?Unsafe.getUnsafe();
2.?private?volatile?int?value;??
3.?public?final?int?get()?{??
4.?????????return?value;??
5.?}??
6.?public?final?void?set(int?newValue)?{??
7.?????????value?=?newValue;??
8.?}??
9.?public?final?boolean?compareAndSet(int?expect,?int?update)?{??
10.????return?unsafe.compareAndSwapInt(this,?valueOffset,?expect,?update);??
11.}
void set()和void lazySet():set設置為給定值,直接修改原始值;lazySet延時設置變量值,這個等價于set()方法,但是由于字段是volatile類型的,因此次字段的修改會比普通字段(非volatile字段)有稍微的性能延時(盡管可以忽略),所以如果不是想立即讀取設置的新值,允許在“后臺”修改值,那么此方法就很有用。
getAndSet( )方法,利用compareAndSet循環自旋實現。
原子的將變量設定為新數據,同時返回先前的舊數據。
其本質是get( )操作,然后做set( )操作。盡管這2個操作都是atomic,但是他們合并在一起的時候,就不是atomic。在Java的源程序的級別上,如果不依賴synchronized的機制來完成這個工作,是不可能的。只有依靠native方法才可以。
Java代碼??
1.??public?final?int?getAndSet(int?newValue)?{??
2.??????for?(;;)?{??
3.??????????int?current?=?get();??
4.??????????if?(compareAndSet(current,?newValue))??
5.??????????????return?current;??
6.??????}??
7.??}?
對于 AtomicInteger、AtomicLong還提供了一些特別的方法。貼別是如,
getAndAdd( ):以原子方式將給定值與當前值相加, 相當于線程安全的t=i;i+=delta;return t;操作。
以實現一些加法,減法原子操作。(注意 --i、++i不是原子操作,其中包含有3個操作步驟:第一步,讀取i;第二步,加1或減1;第三步:寫回內存)
數組類
第二組AtomicIntegerArray,AtomicLongArray還有AtomicReferenceArray類進一步擴展了原子操作,對這些類型的數組提供了支持。這些類在為其數組元素提供?volatile?訪問語義方面也引人注目,這對于普通數組來說是不受支持的。
他們內部并不是像AtomicInteger一樣維持一個valatile變量,而是全部由native方法實現,如下
AtomicIntegerArray的實現片斷:
Java代碼??
1.??private?static?final?Unsafe?unsafe?=?Unsafe.getUnsafe();??
2.??private?static?final?int?base?=?unsafe.arrayBaseOffset(int[].class);??//數組基地址
3.??private?static?final?int?scale?=?unsafe.arrayIndexScale(int[].class);??//數組元素占的大小精度
4.??private?final?int[]?array;??
5.??public?final?int?get(int?i)?{??
6.??????????return?unsafe.getIntVolatile(array,?rawIndex(i));??
7.??}??
8.??public?final?void?set(int?i,?int?newValue)?{??
9.??????????unsafe.putIntVolatile(array,?rawIndex(i),?newValue);??
10.?}
11.???
12.??private longrawIndex(int i) {//獲取具體某個元素的偏移量
13.?????????if (i <0 || i >= array.length)
14.?????????????thrownew IndexOutOfBoundsException("index " + i);
15.?????????return base+ (long) i * scale;
16.?}
更新器類
第三組AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基于反射的實用工具,可以對指定類的指定?volatile?字段進行原子更新。API非常簡單,但是也是有一些約束:
(1)字段必須是volatile類型的
(2)字段的描述類型(修飾符public/protected/default/private)是與調用者與操作對象字段的關系一致。也就是說 調用者能夠直接操作對象字段,那么就可以反射進行原子操作。但是對于父類的字段,子類是不能直接操作的,盡管子類可以訪問父類的字段。
(3)只能是實例變量,不能是類變量,也就是說不能加static關鍵字。
(4)只能是可修改變量,不能使final變量,因為final的語義就是不可修改。實際上final的語義和volatile是有沖突的,這兩個關鍵字不能同時存在。
(5)對于AtomicIntegerFieldUpdater?和AtomicLongFieldUpdater?只能修改int/long類型的字段,不能修改其包裝類型(Integer/Long)。如果要修改包裝類型就需要使用AtomicReferenceFieldUpdater?。
復合變量類
防止ABA問題出現而構造的類。如什么是ABA問題呢,當某些流程在處理過程中是順向的,也就是不允許重復處理的情況下,在某些情況下導致一個數據由A變成B,再中間可能經過0-N個環節后變成了A,此時A不允許再變成B了,因為此時的狀態已經發生了改變,他們都是對atomicReference的進一步包裝,AtomicMarkableReference和AtomicStampedReference功能差不多,有點區別的是:它描述更加簡單的是與否的關系,通常ABA問題只有兩種狀態,而AtomicStampedReference是多種狀態,那么為什么還要有AtomicMarkableReference呢,因為它在處理是與否上面更加具有可讀性。
Lcoks 鎖
此包中實現的最基本的鎖,阻塞線程的LockSupport。核心是AQS框架(AbstractQueuedSynchronizer),是J U C(Java?util concurrent) 最復雜的一個類。
Lock 和Synchronized
J U C 中的Lock和synchronized具有同樣的語義和功能。不同的是,synchronized 鎖在退出塊時自動釋放。而Lock 需要手動釋放,且Lock更加靈活。Syschronizd 是 java 語言層面的,是系統關鍵字;Lock則是java 1.5以來提供的一個類。
Synchronized 具有以下缺陷,它無法中斷一個正在等候獲得鎖的線程;也無法通過投票得到鎖,如果不想等下去,也就沒法得到鎖;同步還要求鎖的釋放只能在與獲得鎖所在的堆棧幀相同的堆棧幀中進行。
而Lock(如ReentrantLock?)除了與Synchronized 具有相同的語義外,還支持鎖投票、定時鎖等候和可中斷鎖等候(就是說在等待鎖的過程中,可以被中斷)的一些特性。
Lock. lockInterruptibly ,調用后,或者獲得鎖,或者被中斷后拋出異常。優先響應異常。這點可以用 類似以下代碼測試。
?Thread a =?new?Thread(task1,?"aa");
?????? Thread b =?new?Thread(task1,?"bb");
?????? a.start();
?????? b.start();
?????? b.interrupt();
LockSupport 和java內置鎖
???在LockSupport出現之前,如果要block/unblock某個Thread,除了使用Java語言內置的monitor機制之外,只能通過Thread.suspend()和Thread.resume()。然而Thread.suspend()和Thread.resume()基本上不可用,除了可能導致死鎖之外,它們還存在一個無法解決的競爭條件:如果在調用Thread.suspend()之前調用了Thread.resume(),那么該Thread.resume()調用沒有任何效果。LockSupport最主要的作用,便是通過一個許可(permit)狀態,解決了這個問題。LockSupport?只能阻塞當前線程,但是可以喚醒任意線程。
?????那么LockSupport和Java語言內置的monitor機制有什么區別呢?它們的語義是不同的。LockSupport是針對特定Thread來進行block/unblock操作的;wait()/notify()/notifyAll()是用來操作特定對象的等待集合的。為了防止知識生銹,在這里簡單介紹一下Java語言內置的monitor機制(詳見:http://whitesock.iteye.com/blog/162344?)。正如每個Object都有一個鎖,每個Object也有一個等待集合(wait set),它有wait、notify、notifyAll和Thread.interrupt方法來操作。同時擁有鎖和等待集合的實體,通常被成為監視器(monitor)。每個Object的等待集合是由JVM維護的。等待集合一直存放著那些因為調用對象的wait方法而被阻塞的線程。由于等待集合和鎖之間的交互機制,只有獲得目標對象的同步鎖時,才可以調用它的wait、notify和notifyAll方法。這種要求通常無法靠編譯來檢查,如果條件不能滿足,那么在運行的時候調用以上方法就會導致其拋出IllegalMonitorStateException。
??? wait() 方法被調用后,會執行如下操作:
·????????如果當前線程已經被中斷,那么該方法立刻退出,然后拋出一個InterruptedException異常。否則線程會被阻塞。
·????????JVM把該線程放入目標對象內部且無法訪問的等待集合中。
·????????目標對象的同步鎖被釋放,但是這個線程鎖擁有的其他鎖依然會被這個線程保留著。當線程重新恢復質執行時,它會重新獲得目標對象的同步鎖。
??? notify()方法被調用后,會執行如下操作:
·????????如果存在的話,JVM會從目標對象內部的等待集合中任意移除一個線程T。如果等待集合中的線程數大于1,那么哪個線程被選中完全是隨機的。
·????????T必須重新獲得目標對象的同步鎖,這必然導致它將會被阻塞到調用Thead.notify()的線程釋放該同步鎖。如果其他線程在T獲得此鎖之前就獲得它,那么T就要一直被阻塞下去。
·????????T從執行wait()的那點恢復執行。
??? notifyAll()方法被調用后的操作和notify()類似,不同的只是等待集合中所有的線程(同時)都要執行那些操作。然而等待集合中的線程必須要在競爭到目標對象的同步鎖之后,才能繼續執行。
?在標準的Sun jdk 中,Locksupport的實現基于Unsafe,都是本地代碼,Android的實現不全是本地代碼。
一個線程調用park阻塞之后,如果被其他線程調用interrupt(),那么他它會響應中斷,解除阻塞,但是不會拋出interruption?異常。這點在構造可中斷獲取鎖的時候用到了。
AbstractQueuedSynchronizer
AQS框架是 J U C包的核心。是構建同步、鎖、信號量和自定義鎖的基礎。也是構建高級工具的基礎。
?
從上圖可以看到,鎖,信號量的實現內部都有兩個內部類,都繼承AQS。
由于AQS的構建上采用模板模式(Template mode),即 AQS定義一些框架,而它的實現延遲到子類。如tryAcquire()方法。由于這個模式,我們如果直接看AQS源碼會比較抽象。所以從某個具體的實現切入簡單易懂。這里選澤ReentrantLock ,它和Synchronized具有同樣的語義。
簡單說來,AbstractQueuedSynchronizer會把所有的請求線程構成一個CLH隊列,當一個線程執行完畢(lock.unlock())時會激活自己的后繼節點,但正在執行的線程并不在隊列中,而那些等待執行的線程全 部處于阻塞狀態,經過調查線程的顯式阻塞是通過調用LockSupport.park()完成,而LockSupport.park()則調用 sun.misc.Unsafe.park()本地方法,再進一步,HotSpot在Linux中中通過調用pthread_mutex_lock函數把 線程交給系統內核進行阻塞。
?
ReentrantLock
從ReentrantLock(可重入鎖)開始,分析AQS。首先需要知道這個鎖和java 內置的同步Synchronized具有同樣的語義。如下代碼解釋重入的意思
| Lock?lock?=?new?ReentrantLock(); ????public?void?test() { ???????lock.lock(); ?????? System.out.print("I am test1"); ?????? test();?//?遞歸調用?……………………………1?遞歸調用不會阻塞,因為已經獲得了鎖,這就是重入的含義 ???????// test2();//?調用test2 ………………………2 ???????lock.unlock();//?這里應該放在finally?塊中,這里簡單省略,以后一樣。 ??? } ????public?void?test2() { ???????lock.lock(); ?????? System.out.println("I am test1"); ?????? test2();// ???????lock.unlock(); ??? } |
重入的意思就是,如果已經獲得了鎖,如果執行期間還需要獲得這個鎖的話,會直接獲得所,不會被阻塞,獲得鎖的次數加1;每執行一次unlock,持有鎖的次數減1,當為0時釋放鎖。這點,Synchronized 具有同樣語義。
查看源碼,可以看到ReentrantLock 對Lock接口的實現,把所有的操作都委派給一個叫Sync的類,如下源碼:
?????
?
其中Sync的定義如右圖
所以這個Syc類是關鍵。而Sync 基礎AQS。Sync又有兩個子類,
| final static class NonfairSync extends Sync? final static class FairSync extends Sync? |
顯然是為了支持公平鎖和非公平鎖而定義,默認情況下為非公平鎖。
先理一下Reentrant.lock()方法的調用過程(默認非公平鎖):
?
這 些討厭的Template模式導致很難直觀的看到整個調用過程,其實通過上面調用過程及AbstractQueuedSynchronizer的注釋可以發現,AbstractQueuedSynchronizer中抽象了絕大多數Lock的功能,而只把tryAcquire方法延遲到子類中實現。 tryAcquire方法的語義在于用具體子類判斷請求線程是否可以獲得鎖,無論成功與否AbstractQueuedSynchronizer都將處理后面的流程。
NonfairSync 和 FairSync 不同的是執行lock時做的操作,如下為 NonfairSync 的操作,其中compareAndSetState(intexpect, int des) 為AQS的方法,設置同步狀態,NonfairSync 通過修改同步狀態獲得鎖,鎖定不成功才執行acquire(1),此方法也在AQS中定義。而 FairSync.lock 直接執行acquire(1)。
| final?void?lock() { ????????????if?(compareAndSetState(0, 1)) ??????????????? setExclusiveOwnerThread(Thread.currentThread()); ????????????else ??????????????? acquire(1); } |
AQS中的Acquire(int)方法調用子類中的tryAcquire(int)實現,這里正是模板模式。如下面的源碼。自此已經進入到了AQS的實現。
| public?final?void?acquire(int?arg) { ????????if?(!tryAcquire(arg) && ??????????? acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ????????????selfInterrupt(); } |
其他方法的調用順序類似,如unlock 調用AQS的release ,release 調用Sync的tryRelease()。
下面看NonfairSync.tryAcquire,它調用Sync.nonfairTryAcquire。以下為實現,首先獲取同步狀態c,o代表鎖沒有線程正在競爭鎖。如果c=0,那么嘗試用CAS操作獲得鎖;或者c!=0,但是鎖被當前線程擁有,那么獲得鎖的次數增加 acquires 次,這就是重入的概念。以上兩種情況都成功獲得鎖,返回真。如果不是以上兩種情況,就沒有獲得鎖,返回假。
| final?boolean?nonfairTryAcquire(int?acquires) { ????????????final?Thread current = Thread.currentThread(); ????????????int?c = getState(); ????????????if?(c == 0) { ????????????????if?(compareAndSetState(0, acquires)) { ??????????????????? setExclusiveOwnerThread(current); ????????????????????return?true; ??????????????? } ??????????? } ????????????else?if?(current == getExclusiveOwnerThread()) { ????????????????int?nextc = c + acquires; ????????????????if?(nextc < 0)?// overflow ????????????????????throw?new?Error("Maximum lock count exceeded"); ??????????????? setState(nextc); ????????????????return?true; ??????????? } ????????????return?false; ??????? } |
如果沒有獲得鎖,即NonfairSync.tryAcqiuer()返回假,那么可以看出 AQS.acquire 將執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg);將此線程追加到等待隊列的隊尾。其中Node是AQS的一個內部類,他是等待隊列節點的抽象。
| private?Node addWaiter(Node mode) { ??????? Node node =?new?Node(Thread.currentThread(), mode); ????????// Try the fast path of enq; backup to full enq on failure ??????? Node pred =?tail; ????????if?(pred !=?null) { ??????????? node.prev?= pred; ????????????if?(compareAndSetTail(pred, node)) { ??????????????? pred.next?= node; ?????????????? ?return?node; ??????????? } ??????? } ??????? enq(node); ????????return?node; } |
其中mode指的是模式,NULL 為獨占,否則為共享鎖。RetranLock為獨占鎖。首先把線程包裝為一個節點。然后獲取等待隊列的尾,如果不為NULL的話(這說明有其他線程在待隊列中行),就把初始化node的前驅為pred.( node.prev?= pred) 然后通過CAS操作把node 設置為新的隊尾,如果成功則設置pred的后繼為 node.至此 快速進隊完成。
但是如果pred為null(此時沒有線程在等待,一開始tail 就是null) ,或者CAS設置隊尾失敗。則需要執行下面的入隊流程。?這里可能是整個阻塞隊列的初始化過程。Tail 為null
| private?Node enq(final?Node node) { ????????for?(;;) { ??????????? Node t =?tail; ????????????if?(t ==?null) {?// Must initialize ??????????????? Node h =?new?Node();?// Dummy header ??????????????? h.next?= node; ??????????????? node.prev?= h; ????????????????if?(compareAndSetHead(h)) { ????????????????????tail?= node; ????????????????????return?h; ??????????????? } ??????????? } ????????????else?{ ??????????????? node.prev?= t; ????????????????if?(compareAndSetTail(t, node)) { ??????????????????? t.next?= node; ????????????????????return?t; ??????????????? } ??????????? } ?????? } ??? } ? |
該方法就是循環調用CAS,即使有高并發的場景,無限循環將會最終成功把當前線程追加到隊尾(或設置隊頭)。總而言之,addWaiter的目的就是通過CAS把當前現在追加到隊尾,并返回包裝后的Node實例。
把線程要包裝為Node對象的主要原因,除了用Node構造供虛擬隊列外,還用Node包裝了各種線程狀態,這些狀態被精心設計為一些數字值:
SIGNAL(-1) :線程的后繼線程正/已被阻塞,當該線程release或cancel時要重新這個后繼線程(unpark)
CANCELLED(1):因為超時或中斷,該線程已經被取消
CONDITION(-2):表明該線程被處于條件隊列,就是因為調用了Condition.await而被阻塞。
PROPAGATE(-3):傳播共享鎖
0:0代表無狀態
接下來執行acquireQueued(Node)方法。acquireQueued的主要作用是把已經追加到隊列的線程節點(addWaiter方法返回值)進行阻塞,但阻塞前又通過tryAccquire重試是否能獲得鎖,如果重試成功能則無需阻塞,直接返回。
| final?boolean?acquireQueued(final?Node node,?int?arg) { ????????try?{ ????????????boolean?interrupted =?false; ????????????for?(;;) { ????????????????final?Node p = node.predecessor(); ????????????????if?(p ==?head?&& tryAcquire(arg)) { ??????????????????? setHead(node); ??????????????????? p.next?=?null;?// help GC ????????????????????return?interrupted; ??????????????? } ????????????????if?(shouldParkAfterFailedAcquire(p, node) && ??????????????????? parkAndCheckInterrupt()) ??????????????????? interrupted =?true; ??????????? } ??????? }?catch?(RuntimeException ex) { ??????????? cancelAcquire(node); ????????????throw?ex; ??????? } ??? } |
以上的循環不會無限進行,因為接下來線程會被阻塞。這由parkAndCheckInterrupt()方法實現,但是它只有在shouldParkAfterFailedAcquire 方法返回 true 的時候后才會繼續執行進而阻塞。所以看 shouldParkAfterFailedAcquire方法,從方法的名字看 意思是,當獲取鎖失敗的時候是否應該阻塞。
| private?static?boolean?shouldParkAfterFailedAcquire(Node pred, Node node) { ????????int?ws = pred.waitStatus; ????????if?(ws == Node.SIGNAL) ????????????/* ???????????? * This node has already set status asking a release ???????????? * to signal it, so it can safely park ???????????? */ ????????????return?true; ????????if?(ws > 0) { ????????????/* ???????????? * Predecessor was cancelled. Skip over predecessors and ???????????? * indicate retry. ???????????? */ ??? ????do?{ ??? ??? node.prev?= pred = pred.prev; ??? ??? }?while?(pred.waitStatus?> 0); ??? ??? pred.next?= node; ??????? }?else?{ ????????????/* ???????????? * waitStatus must be 0 or PROPAGATE. Indicate that we ???????????? * need a signal, but don't park yet. Caller will need to ???????????? * retry to make sure it cannot acquire before parking. ???????????? */ ????????????compareAndSetWaitStatus(pred, ws, Node.SIGNAL); ??????? } ????????return?false; ??? } |
此方法的作用是根據它的前驅節點決定本節點做什么樣的操作。前面已經說過Node的節點的waitState 表示它個后繼節點 需要做什么操作。這里就是對線程狀態的檢查,所有這個方法參數中有前驅節點。
檢查原則在于:
規則1:如果前繼的節點狀態為SIGNAL,表明當前節點需要unpark,則返回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)將導致線程阻塞
規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,返回false,acquireQueued方法的無限循環將遞歸調用該方法,直至規則1返回true,導致線程阻塞
規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設置前繼的狀態為SIGNAL,返回false后進入acquireQueued的無限循環,與規則2同
總體看來,shouldParkAfterFailedAcquire就是靠前繼節點判斷當前線程是否應該被阻塞,如果前繼節點處于CANCELLED狀態,則順便刪除這些節點重新構造隊列。
至此,獲取鎖完畢。
請求鎖不成功的線程會被掛起在acquireQueued方法的第12行,12行以后的代碼必須等線程被解鎖鎖才能執行,假如被阻塞的線程得到解鎖,則執行第13行,即設置interrupted = true,之后又進入無限循環。
解鎖的過程相對簡單一些。
調用關系如下順序 ReentrantLock.unlock()????AQS.release()? --Synx.tryRealse()
從無限循環的代碼可以看出,并不是得到解鎖的線程一定能獲得鎖,必須在第6行中調用tryAccquire重新競爭,因為鎖是非公平的,有可能被新加入的線程獲得,從而導致剛被喚醒的線程再次被阻塞,這個細節充分體現了“非公平”的精髓。此可以看到,把tryAcquire方法延遲到子類中實現的做法非常精妙并具有極強的可擴展性,令人嘆為觀止!當然精妙的不是這個Templae設計模式,而是Doug Lea對鎖結構的精心布局。
| public?void?unlock() { ??????? sync.release(1); } |
release的語義在于:如果可以釋放鎖,則喚醒隊列第一個線程(Head.next)。release先調用tryRelease調用是否解鎖成功,解鎖成長才進行下一步操作。
| public?final?boolean?release(int?arg) { ????????if?(tryRelease(arg)) { ??????????? Node h =?head; ????????????if?(h !=?null?&& h.waitStatus?!= 0) ??????????????? unparkSuccessor(h); ????????????return?true; ??????? } ????????return?false; ??? } |
tryRelease與tryAcquire語義相同,把如何釋放的邏輯延遲到子類中。tryRelease語義很明確:如果線程多次鎖定,則進行多次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設置status為0,因為無競爭所以沒有使用CAS。如下源代碼
| protected?final?boolean?tryRelease(int?releases) { ????????????int?c = getState() - releases; ????????????if?(Thread.currentThread() != getExclusiveOwnerThread()) ????????????????throw?new?IllegalMonitorStateException(); ????????????boolean?free =?false; ????????????if?(c == 0) { ??????????????? free =?true; ??????????????? setExclusiveOwnerThread(null); ??????????? } ??????????? setState(c); ????????????return?free; ??????? } |
下面的源代碼是喚醒隊列的第一個線程。但是其可能被取消,當被取消的時候,從隊尾往前找線程。(不從對頭開始的原因是,隊尾一直在變化,不容易判斷)
| private?void?unparkSuccessor(Node node) { ????????/* ???????? * If status is negative (i.e., possibly needing signal) try ???????? * to clear in anticipation of signalling. It is OK if this ???????? * fails or if status is changed by waiting thread. ???????? */ ????????int?ws = node.waitStatus; ????????if?(ws < 0) ????????????compareAndSetWaitStatus(node, ws, 0); ? ????????/* ???????? * Thread to unpark is held in successor, which is normally ???????? * just the next node.? But if cancelled or apparently null, ???????? * traverse backwards from tail to find the actual ???????? * non-cancelled successor. ???????? */ ??????? Node s = node.next; ????????if?(s ==?null?|| s.waitStatus?> 0) { ??????????? s =?null; ????????????for?(Node t =?tail; t !=?null?&& t != node; t = t.prev) ????????????????if?(t.waitStatus?<= 0) ??????????????????? s = t; ??????? } ????????if?(s !=?null) ??????????? LockSupport.unpark(s.thread); ??? } |
?
可中斷鎖的實現:本質是調用 AQS. 他在響應中斷后直接跳出循環,拋出異常,而正常額Lock 忽略這個中斷,只是簡單的記錄下,然后繼續循環。
| private?void?doAcquireInterruptibly(int?arg) ????????throws?InterruptedException { ????????final?Node node = addWaiter(Node.EXCLUSIVE); ????????try?{ ????????????for?(;;) { ????????????????final?Node p = node.predecessor(); ????????????????if?(p ==?head?&& tryAcquire(arg)) { ??????????????????? setHead(node); ??????????????????? p.next?=?null;?// help GC ????????????????????return; ????? ??????????} ????????????????if?(shouldParkAfterFailedAcquire(p, node) && ??????????????????? parkAndCheckInterrupt()) ????????????????????break; ??????????? } ??????? }?catch?(RuntimeException ex) { ??????????? cancelAcquire(node); ????????????throw?ex; ??????? } ????????// Arrive here only if interrupted ??????? cancelAcquire(node); ????????throw?new?InterruptedException(); ??? } |
超時鎖的實現基本類似,就是阻塞一段時間后自己恢復,如果有中斷則拋出異常。
| private?boolean?doAcquireNanos(int?arg,?long?nanosTimeout) ????????throws?InterruptedException { ????????long?lastTime = System.nanoTime(); ????????final?Node node = addWaiter(Node.EXCLUSIVE); ????????try?{ ????????????for?(;;) { ????????????????final?Node p = node.predecessor(); ????????????????if?(p ==?head?&& tryAcquire(arg)) { ??????????????????? setHead(node); ??????????????????? p.next?=?null;?// help GC ????????????????????return?true; ??????????????? } ????????????????if?(nanosTimeout <= 0) { ??????????????????? cancelAcquire(node); ????????????????????return?false; ??????????????? } ????????????????if?(nanosTimeout >?spinForTimeoutThreshold?&& ????????????????????shouldParkAfterFailedAcquire(p, node)) ??????????????????? LockSupport.parkNanos(this, nanosTimeout); ????????????????long?now = System.nanoTime(); ??????? ????????nanosTimeout -= now - lastTime; ??????????????? lastTime = now; ????????????????if?(Thread.interrupted()) ????????????????????break; ??????????? } ??????? }?catch?(RuntimeException ex) { ??????????? cancelAcquire(node); ????????????throw?ex; ??????? } ????????// Arrive here only if interrupted ??????? cancelAcquire(node); ????????throw?new?InterruptedException(); ??? } |
?
Condition
Condition 實現了與java內容monitor 類似的功能。提供 await,signal,signalall 等操作,與object . wait等一系列操作對應。不同的是一個condition 可以有多個條件隊列。這點內置monitor 是做不到的。另外還支持 超時、取消等更加靈活的方式。
和內置的Monitor一樣,調用 Condition。aWait 等操作,需要獲得鎖,也就是 Condition 是和一個鎖綁定在一起的。它的實現 是在AQS中,基本思想如下:一下內容抄自博客:http://www.nbtarena.com/Html/soft/201308/2429.html
public final void await() throws InterruptedException {
??? // 1.如果當前線程被中斷,則拋出中斷異常
??? if (Thread.interrupted())
??????? throw newInterruptedException();
??? // 2.將節點加入到Condition隊列中去,這里如果lastWaiter是cancel狀態,那么會把它踢出Condition隊列。
??? Node node = addConditionWaiter();
??? // 3.調用tryRelease,釋放當前線程的鎖
??? long savedState =fullyRelease(node);
??? int interruptMode = 0;
??? // 4.為什么會有在AQS的等待隊列的判斷?
??? // 解答:signal*作會將Node從Condition隊列中拿出并且放入到等待隊列中去,在不在AQS等待隊列就看signal是否執行了
??? // 如果不在AQS等待隊列中,就park當前線程,如果在,就退出循環,這個時候如果被中斷,那么就退出循環
??? while (!isOnSyncQueue(node)) {
??????? LockSupport.park(this);
??????? if ((interruptMode =checkInterruptWhileWaiting(node)) != 0)
??????????? break;
??? }
??? // 5.這個時候線程已經被signal()或者signalAll()*作給喚醒了,退出了4中的while循環
??? // 自旋等待嘗試再次獲取鎖,調用acquireQueued方法
??? if (acquireQueued(node,savedState) && interruptMode != THROW_IE)
??????? interruptMode = REINTERRUPT;
??? if (node.nextWaiter != null)
??????? unlinkCancelledWaiters();
??? if (interruptMode != 0)
??????? reportInterruptAfterWait(interruptMode);
}
?
整個await的過程如下:
1.將當前線程加入Condition鎖隊列。特別說明的是,這里不同于AQS的隊列,這里進入的是Condition的FIFO隊列。進行2。
2.釋放鎖。這里可以看到將鎖釋放了,否則別的線程就無法拿到鎖而發生死鎖。進行3。
3.自旋(while)掛起,直到被喚醒或者超時或者CACELLED等。進行4。
4.獲取鎖(acquireQueued)。并將自己從Condition的FIFO隊列中釋放,表明自己不再需要鎖(我已經拿到鎖了)。
可以看到,這個await的*作過程和Object.wait()方法是一樣,只不過await()采用了Condition隊列的方式實現了Object.wait()的功能。
signal和signalAll方法
await*()清楚了,現在再來看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要將Condition.await*()中FIFO隊列中第一個Node喚醒(或者全部Node)喚醒。盡管所有Node可能都被喚醒,但是要知道的是仍然只有一個線程能夠拿到鎖,其它沒有拿到鎖的線程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。
?Java Code
?
public final void signal() {
??? if (!isHeldExclusively())
??????? throw newIllegalMonitorStateException();
??? Node first = firstWaiter;
??? if (first != null)
??????? doSignal(first);
}
這里先判斷當前線程是否持有鎖,如果沒有持有,則拋出異常,然后判斷整個condition隊列是否為空,不為空則調用doSignal方法來喚醒線程,看看doSignal方法都干了一些什么:
?Java Code
?
private void doSignal(Node first) {
??? do {
??????? if ( (firstWaiter =first.nextWaiter) == null)
??????????? lastWaiter = null;
??????? first.nextWaiter = null;
??? } while(!transferForSignal(first) &&
???????????? (first = firstWaiter)!= null);
}
上面的代*很容易看出來,signal就是喚醒Condition隊列中的第一個非CANCELLED節點線程,而signalAll就是喚醒所有非CANCELLED節點線程。當然了遇到CANCELLED線程就需要將其從FIFO隊列中剔除。
?
?Java Code
?
final boolean transferForSignal(Node node) {
??? /*
???? * 設置node的waitStatus:Condition->0
???? */
??? if(!compareAndSetWaitStatus(node, Node.CONDITION, 0))
??????? return false;
??? /*
???? * 加入到AQS的等待隊列,讓節點繼續獲取鎖
???? * 設置前置節點狀態為SIGNAL
???? */
??? Node p = enq(node);
??? int c = p.waitStatus;
??? if (c > 0 ||!compareAndSetWaitStatus(p, c, Node.SIGNAL))
???????LockSupport.unpark(node.thread);
??? return true;
}
?
上面就是喚醒一個await*()線程的過程,根據前面的介紹,如果要unpark線程,并使線程拿到鎖,那么就需要線程節點進入AQS的隊列。所以可以看到在LockSupport.unpark之前調用了enq(node)*作,將當前節點加入到AQS隊列。
?
signalAll和signal方法類似,主要的不同在于它不是調用doSignal方法,而是調用doSignalAll方法:
?Java Code
?
private void doSignalAll(Node first) {
??? lastWaiter = firstWaiter? = null;
??? do {
??????? Node next =first.nextWaiter;
??????? first.nextWaiter = null;
??????? transferForSignal(first);
??????? first = next;
??? } while (first != null);
}
這個方法就相當于把Condition隊列中的所有Node全部取出插入到等待隊列中去。
?
線程池
JUC 中提供了線程池的實現,其基于一系列的抽象和接口。接下里一步一步解開線程池的神秘面紗。
首先應該了解線程池的使用。J U C 提供了一個 構造線程池的 工廠類。java.util.concurrent.Executors 。此工廠提供了構造各種不同類型線程池的靜態方法。如固定線程池,單一工作線程池,和緩存線程池等。
如下代碼構造了一個具有2個固定工作線程的線程池。
| ExecutorService ser = Executors.newFixedThreadPool(2); |
經過跟蹤,此構造函數最終調用如下,其參數解釋如下:
corePoolSize - 池中所保存的線程數,包括空閑線程。
maximumPoolSize - 池中允許的最大線程數。
keepAliveTime - 當線程數大于核心時,此為終止前多余的空閑線程等待新任務的最長時間。
unit - keepAliveTime 參數的時間單位。
workQueue - 執行前用于保持任務的隊列。此隊列僅保持由 execute 方法提交的 Runnable 任務。
threadFactory - 執行程序創建新線程時使用的工廠。
handler - 由于超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序。
?
| public?ThreadPoolExecutor(int?corePoolSize, ???????? ?????????????????????int?maximumPoolSize, ??????????????????????????????long?keepAliveTime, ????????????????????????????? TimeUnit unit, ????????????????????????????? BlockingQueue<Runnable> workQueue, ????????????????????????????? ThreadFactory threadFactory, ????????????????????????????? RejectedExecutionHandler handler) { ????????if?(corePoolSize < 0 || ??????????? maximumPoolSize <= 0 || ??????????? maximumPoolSize < corePoolSize || ??????????? keepAliveTime < 0) ????????????throw?new?IllegalArgumentException(); ????????if?(workQueue ==?null?|| threadFactory ==?null?|| handler ==?null) ????????????throw?new?NullPointerException(); ????????this.corePoolSize?= corePoolSize; ????????this.maximumPoolSize?= maximumPoolSize; ????????this.workQueue?= workQueue; ????????this.keepAliveTime?= unit.toNanos(keepAliveTime); ????????this.threadFactory?= threadFactory; ????????this.handler?= handler; ??? } |
?
我們構造的線程池的類型是?ExecutorService,ThreadPoolExecutor繼承AbstractExecutorService,其總體類圖如下,可以看到最初的抽象是Exector。
?
?
接口Executor
該接口只有一個方法,JDK解釋如下
執行已提交的Runnable?任務的對象。此接口提供一種將任務提交與每個任務將如何運行的機制(包括線程使用的細節、調度等)分離開來的方法。
不過,Executor 接口并沒有嚴格地要求執行是異步的。在最簡單的情況下,執行程序可以在調用者的線程中立即運行已提交的任務:
| class DirectExecutor implements Executor { ???? public void execute(Runnable r) { ???????? r.run(); ???? } ?} |
更常見的是,任務是在某個不是調用者線程的線程中執行的。以下執行程序將為每個任務生成一個新線程。
| class ThreadPerTaskExecutor implements Executor { ???? public void execute(Runnable r) { ???????? new Thread(r).start(); ???? } ?} |
方法介紹如下:
void execute(Runnable?command)在未來某個時間執行給定的命令。該命令可能在新的線程、已入池的線程或者正調用的線程中執行,這由Executor實現決定。
接口ExecutorService
ExecutorService 是對?Executor?的擴展,JDK文檔解釋如下:
Executor?提供了管理終止的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成?Future?的方法。
可以關閉ExecutorService,這將導致其拒絕新任務。提供兩個方法來關閉 ExecutorService。shutdown()?方法在終止前允許執行以前提交的任務,而?shutdownNow()方法阻止等待任務啟動并試圖停止當前正在執行的任務。在終止時,執行程序沒有任務在執行,也沒有任務在等待執行,并且無法提交新任務。應該關閉未使用的 ExecutorService 以允許回收其資源。
通過創建并返回一個可用于取消執行和/或等待完成的?Future,方法 submit 擴展了基本方法Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量執行的最常用形式,它們執行任務 collection,然后等待至少一個,或全部任務完成(可使用?ExecutorCompletionService?類來編寫這些方法的自定義變體)。
此接口中的關鍵是三個submit 方法,接受一個任務,并返回結果Future。
| <T> Future<T> submit(Callable<T> task); ? <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); |
其中Callable 就是帶返回結果的Runnable。定義如下:
| public?interface?Callable<V> { ????/** ???? * Computes a result, or throws an exception if unable to do so. ???? * ???? *?@return?computed result ???? *?@throws?Exception if unable to compute a result ???? */ ??? V call()?throws?Exception; } |
精彩的是返回一個表示任務的未決結果的 Future。該 Future 的get?方法在成功完成時將會返回該任務的結果。注意這些過程是異步的。
接口Future
JDK解釋如下:
Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,并獲取計算的結果。計算完成后只能使用 get 方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用 Future 但又不提供可用的結果,則可以聲明 Future<?> 形式類型、并返回 null 作為底層任務的結果。
它的方法簡介如下:
| ?boolean | cancel(boolean?mayInterruptIfRunning)? |
| ?V | get()? |
| ?V | get(long?timeout,?TimeUnit?unit)? |
| ?boolean | isCancelled()? |
| ?boolean | isDone()? |
Submit后發生的事情
有了以上的一些基本了解,接下來看當任務提交之后發生的一系列過程。
Submit 的實際代碼位于AbstractExecutorService,繼承ExecutorService。來觀察其三個submit方法。
構造RunnableFuture
| public?Future<?> submit(Runnable task) { ????????if?(task ==?null)?throw?new?NullPointerException(); ??????? RunnableFuture<Object> ftask = newTaskFor(task,?null); ??????? execute(ftask); ????????return?ftask; ??? } ? ????public?<T> Future<T> submit(Runnable task, T result) { ????????if?(task ==?null)?throw?new?NullPointerException(); ??????? RunnableFuture<T> ftask = newTaskFor(task, result); ??????? execute(ftask); ????????return?ftask; ??? } ? ????public?<T> Future<T> submit(Callable<T> task) { ????????if?(task ==?null)?throw?new?NullPointerException(); ??????? RunnableFuture<T> ftask = newTaskFor(task); ??????? execute(ftask); ????????return?ftask; ??? } |
可以看出,不論submit 方法的參數是什么,都是先構造一個RunnableFuture ,然偶執行它,并返回它。執行和返回的都是RunnableFuture。所以RunnableFuture實現了future 接口和runnnable接口。注意這點的類型是RunnableFuture,所有接下來的execute方法執行的run方法是RunnableFuture?的具體實現類FutureTask的run方法。
來看RunnableFuture,其代碼如下:
| /** ?* A?{@link Future}?that is?{@link Runnable}. Successful execution of ?* the?<tt>run</tt>?method causes completion of the?<tt>Future</tt> ?* and allows access to its results. ?*?@see?FutureTask ?*?@see?Executor ?*?@since?1.6 ?*?@author?Doug Lea ?*?@param<V>?The result type returned by this Future's?<tt>get</tt>?method ?*/ public?interface?RunnableFuture<V>?extends?Runnable, Future<V> { ????/** ???? * Sets this Future to the result of its computation ???? * unless it has been cancelled. ???? */ ????void?run(); } |
作為?Runnable?的?Future。成功執行 run 方法可以完成 Future 并允許訪問其結果。以下代碼可以看出 返回的實際上是FutureTask,為RunnableFuture的實現類。
| protected?<T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { ????????return?new?FutureTask<T>(runnable, value);} |
?
關于 FutureTask ?JDK對其介紹如下:
可取消的異步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對Future?的基本實現。僅在計算完成時才能獲取結果;如果計算尚未完成,則阻塞 get 方法。一旦計算完成,就不能再重新開始或取消計算。
可使用FutureTask 包裝?Callable?或?Runnable?對象。因為 FutureTask 實現了 Runnable,所以可將 FutureTask 提交給?Executor?執行。
除了作為一個獨立的類外,此類還提供了 protected 功能,這在創建自定義任務類時可能很有用。
先看其構造函數。可以看出其構造函數主要是一個 同步器的構造。同步器接受一個Callable類型的參數。
| public?FutureTask(Callable<V> callable) { ????????if?(callable ==?null) ????????????throw?new?NullPointerException(); ????????sync?=?new?Sync(callable); ??? } ? ?? ????public?FutureTask(Runnable runnable, V result) { ????????sync?=?new?Sync(Executors.callable(runnable, result)); ??? } |
對于參數是Runnable 類型時,經過轉化為Callable 類型,轉化代碼如下,本質上就是在Callable 的call方法中調用Runnable的run方法:
| public?static?<T> Callable<T> callable(Runnable task, T result) { ????????if?(task ==?null) ????????????throw?new?NullPointerException(); ????????return?new?RunnableAdapter<T>(task, result); } static?final?class?RunnableAdapter<T>?implements?Callable<T> { ????????final?Runnable?task; ????????final?T?result; ??????? RunnableAdapter(Runnable? task, T result) { ????????????this.task?= task; ????????????this.result?= result; ??????? } ????????public?T call() { ????????????task.run(); ????????????return?result; ??????? } ??? } |
?
FutureTask的關鍵邏輯都由他的一個內部類Sync 實現。我們先暫且不管其具體實現,留在后面說。
執行
接下來看 執行任務。Execute 方法實現在ThreadPoolExecutor 類中,這是具體的線程池。其Execute 方法如下:
| public?void?execute(Runnable command) { ????????if?(command ==?null) ????????????throw?new?NullPointerException(); ????????if?(poolSize?>=?corePoolSize?|| !addIfUnderCorePoolSize(command)) { ????????????if?(runState?==?RUNNING?&&?workQueue.offer(command)) { ????????????????if?(runState?!=?RUNNING?||?poolSize?== 0) ??????????????????? ensureQueuedTaskHandled(command); ??????????? } ?????????? ?else?if?(!addIfUnderMaximumPoolSize(command))//這里是再給一次機會 ??????????????? reject(command);?// is shutdown or saturated ??????? } ??? } |
具體的邏輯如下描述:
首先判斷空;
如果當前池大小 小于 核心池大小(初始就是這樣),那么會執行 addIfUnderCorePoolSize這個方法。這個方法就會創建新的工作線程,且把當前任務 command 設置為他的第一個任務,并開始執行,返回true。整個execute方法結束。(1)否則加入到等待隊列中。(2)
執行情況1
先看情況1:如下代碼,只有當前池大小小于核心池大小的時候,且線程池處于RUNNING狀態的時候才增加新的工作線程,并把傳進來的任務作為第一個任務并開始執行。此時返回真,否則返回假。
| /** ???? * Creates and starts a new thread running firstTask as its first ???? * task, only if fewer than corePoolSize threads are running ???? * and the pool is not shut down. ???? *?@param?firstTask the task the new thread should run first (or ???? * null if none) ???? *?@return?true if successful ???? */ ????private?boolean?addIfUnderCorePoolSize(Runnable firstTask) { ??????? Thread t =?null; ????????final?ReentrantLock mainLock =?this.mainLock; ??????? mainLock.lock(); ????????try?{ ????????????if?(poolSize?<?corePoolSize?&&?runState?==?RUNNING) ??????????????? t = addThread(firstTask); ??????? }?finally?{ ??????????? mainLock.unlock(); ??????? } ????????if?(t ==?null) ????????????return?false; ??????? t.start(); ????????return?true; } ? /** ???? * Creates and returns a new thread running firstTask as its first ???? * task. Call only while holding mainLock. ???? * ???? *?@param?firstTask the task the new thread should run first (or ???? * null if none) ???? *?@return?the new thread, or null if threadFactory fails to create thread ???? */ ????private?Thread addThread(Runnable firstTask) { ??????? Worker w =?new?Worker(firstTask);//工作線程, ??????? Thread t =?threadFactory.newThread(w);//封裝成線程 ????????if?(t !=?null) { ??????????? w.thread?= t; ????????????workers.add(w); ????????????int?nt = ++poolSize; ????????????if?(nt >?largestPoolSize) ????????????????largestPoolSize?= nt; ??????? } ????????return?t; ??? } |
執行情況2
如果當前池大小 大于核心池的大小,或者添加新的工作線程失敗(這可能是多線程環境下,競爭鎖,被阻塞,其他線程已經創建好了工作線程)。那么當前任務進入到等待隊列。
如果隊列滿,或者線程池已經關閉,那么拒絕該任務。
工作線程worker
對工作線程的封裝是類Worker,它實現了Runnable接口。addThread方法把Worker 組成線程(用threadFactory),并加入線程池,重新設置線程池 大小的 達到的最大值。
重點研究下worker的run方法,首先運行第一個任務,以后通過getTask()獲取新的任務,如果得不到,工作線程會自動結束,在結束前 會執行一些工作,見后面。
| public?void?run() { ????????????try?{ ??????????????? Runnable task =?firstTask; ????????????? ??firstTask?=?null; ????????????????while?(task !=?null?|| (task = getTask()) !=?null) { ??????????????????? runTask(task); ??????????????????? task =?null; ??????????????? } ??????????? }?finally?{ ??????????????? workerDone(this); ??????????? } } |
執行提交的任務,執行任務前 后可以 各進行 一些處理,目前默認實現是什么也不做,擴展的類可以實現它。
| private?void?runTask(Runnable task) { ????????????final?ReentrantLock runLock =?this.runLock; ??????????? runLock.lock(); ????????????try?{ ????????????????/* ???????????????? * Ensure that unless pool is stopping, this thread ???????????????? * does not have its interrupt set. This requires a ???????????????? * double-check of state in case the interrupt was ???????????????? * cleared concurrently with a shutdownNow -- if so, ???????????????? * the interrupt is re-enabled. ???????????????? */ ????????????????if?(runState?<?STOP?&& ??????????????????? Thread.interrupted() && ????????????????????runState?>=?STOP) ????????????????????thread.interrupt(); ????????????????/* ???????????????? * Track execution state to ensure that afterExecute ???????????????? * is called only if task completed or threw ???????????????? * exception. Otherwise, the caught runtime exception ???????????????? * will have been thrown by afterExecute itself, in ???????????????? * which case we don't want to call it again. ???????????????? */ ????????????????boolean?ran =?false; ??????????????? beforeExecute(thread, task); ????????????????try?{ ??????????????????? task.run(); ?????? ?????????????ran =?true; ??????????????????? afterExecute(task,?null); ??????????????????? ++completedTasks; ??????????????? }?catch?(RuntimeException ex) { ????????????????????if?(!ran) ??????????????????????? afterExecute(task, ex); ????????????????????throw?ex; ??????????????? } ??????????? }?finally?{ ??????????????? runLock.unlock(); ??????????? } ??????? } |
下面的方法是 工作線程銷毀錢調用的方法,是在run中調用的。當池大小為0的時候,調用tryterminate 方法。
| /* *Performs bookkeeping for an exiting worker thread. ???? ?*?@param?w the worker?此方法在ThreadPoolExecutor?中 ????? */ void?workerDone(Worker w) { ????????final?ReentrantLock mainLock =?this.mainLock; ??????? mainLock.lock(); ????????try?{ ????????????completedTaskCount?+= w.completedTasks; ????????????workers.remove(w); ????????????if?(--poolSize?== 0) ??????????????? tryTerminate(); ??????? }?finally?{ ??????????? mainLock.unlock(); ??????? } ??? } |
這個方法只有在線程池的狀態是是stop 或者shutdown的時候才會真正的關閉整個線程池。另外shutdown也會調用這個方法。
| /** ???? * Transitions to TERMINATED state if either (SHUTDOWN and pool ???? * and queue empty) or (STOP and pool empty), otherwise unless ???? * stopped, ensuring that there is at least one live thread to ???? * handle queued tasks. ???? * ???? * This method is called from the three places in which ???? * termination can occur: in workerDone on exit of the last thread ???? * after pool has been shut down, or directly within calls to ???? * shutdown or shutdownNow, if there are no live threads. ???? */ ????private?void?tryTerminate() { ????????if?(poolSize?== 0) { ????????????int?state =?runState; ????????????if?(state <?STOP?&& !workQueue.isEmpty()) { ??????????????? state =?RUNNING;?// disable termination check below ??????????????? Thread t = addThread(null); ????????????????if?(t !=?null) ??????????????????? t.start(); ??? ????????} ????????????if?(state ==?STOP?|| state ==?SHUTDOWN) { ????????????????runState?=?TERMINATED; ????????????????termination.signalAll(); ??????????????? terminated(); ??????????? } ??????? } ??? } |
FutureTask
此類是RunnableFuture的實現類。線程池執行的run方法是它的run方法。它委托給Sync實現,SYNC 繼承AQS。
| /** ???? * Sets this Future to the result of its computation ???? * unless it has been cancelled. ???? */ ????public?void?run() { ????????sync.innerRun(); ??? } |
重點看Sync。對具體任務的調用發生在innerSet(callable.call());這句調用,innerSet的方法 作用是 設置get方法的返回值。
| void?innerRun() { ????????????if?(!compareAndSetState(0,?RUNNING)) ????????????????return; ????????????try?{ ????????????????runner?= Thread.currentThread(); ????????????????if?(getState() ==?RUNNING)?// recheck after setting thread ??????????????????? innerSet(callable.call()); ????????????????else ??????????????????? releaseShared(0);?// cancel ??????????? }?catch?(Throwable ex) { ??????????????? innerSetException(ex); ??????????? } ??????? } |
//設置后才釋放鎖。
| void?innerSet(V v) { ??? ????for?(;;) { ???????int?s = getState(); ???????if?(s ==?RAN) ?????? ????return; ????????????????if?(s ==?CANCELLED) { ?????? ????// aggressively release to set runner to null, ?????? ????// in case we are racing with a cancel request ?????? ????// that will try to interrupt runner ??????????????????? releaseShared(0); ????????????????????return; ??????????????? } ???????if?(compareAndSetState(s,?RAN)) { ????????????????????result?= v; ??????????????????? releaseShared(0); ??????????????????? done(); ?????? ????return; ??????????????? } ??????????? } ??????? } |
而get方法是需要獲取鎖的,所以在具體的任務沒有執行完前,調用get方法會進入到阻塞狀態。
| V innerGet()?throws?InterruptedException, ExecutionException { ??????????? acquireSharedInterruptibly(0); ????????????if?(getState() ==?CANCELLED) ????????????????throw?new?CancellationException(); ????????????if?(exception?!=?null) ????????????????throw?new?ExecutionException(exception); ????????????return?result; ??????? } |
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
參考
http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html框架介紹,比較廣泛
http://chenzehe.iteye.com/blog/1759884原子類
http://blog.sina.com.cn/s/blog_75f0b54d0100r7af.html鎖的操作系統原理
http://www.infoq.com/cn/articles/atomic-operation此人是淘寶大神,原子操作的實現
http://agapple.iteye.com/blog/970055?? java線程阻塞中斷和LockSupport的常見問題
http://blog.csdn.net/hintcnuie/article/details/11022049??????????????????? ??對比synchronized與java.util.concurrent.locks.Lock的異同
http://www.blogjava.net/xylz/archive/2010/07/06/325390.htmlAQS
http://www.open-open.com/lib/view/open1352431606912.html? 比較清晰的解釋AQS的實現。? 這點給我的啟示是,看源代碼的時候,如果碰到 抽象類,那么跟它的實現類 結合一起看,搞清楚調用關系(這里肯定是模板模式,調用關系單單看抽象類看不明白)
http://whitesock.iteye.com/blog/162344java內置的鎖機制
http://whitesock.iteye.com/blog/1336409? Inside AbstractQueuedSynchronizer 系列,寫的非常精彩
?
http://www.nbtarena.com/Html/soft/201308/2429.html? 有關condition 的講解。
轉載于:https://www.cnblogs.com/davidwang456/p/6094996.html
總結
以上是生活随笔為你收集整理的java.util.concurrent包详细分析--转的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring源码分析之spring-co
- 下一篇: Java 理论与实践: 正确使用 Vol