Java中的队列同步器AQS
一、AQS概念
1、隊(duì)列同步器是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,使用一個(gè)int型變量代表同步狀態(tài),通過內(nèi)置的隊(duì)列來完成線程的排隊(duì)工作。
2、下面是JDK8文檔中對(duì)于AQS的部分介紹
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable 提供一個(gè)框架,用于實(shí)現(xiàn)依賴先進(jìn)先出(FIFO)等待隊(duì)列的阻塞鎖和相關(guān)同步器(信號(hào)量,事件等)。 該類被設(shè)計(jì)為大多數(shù)類型的同步器的有用依據(jù),這些同步器依賴于單個(gè)原子int值來表示狀態(tài)。子類必須定義改變此狀態(tài)的protected方法,以及根據(jù)該對(duì)象被獲取或釋放來定義該狀態(tài)的含義。給定這些,這個(gè)類中的其他方法執(zhí)行所有排隊(duì)和阻塞機(jī)制。 子類可以保持其他狀態(tài)字段,但只以
原子方式更新int使用方法操縱值getState() , setState(int)和compareAndSetState(int, int)被跟蹤相對(duì)于同步。 此類支持默認(rèn)獨(dú)占模式和共享模式。 當(dāng)以獨(dú)占模式獲取時(shí),嘗試通過其他線程獲取不能成功。 多線程獲取的共享模式可能(但不需要)成功。 除了在機(jī)械意義上,這個(gè)類不理解這些差異,當(dāng)共享
模式獲取成功時(shí),下一個(gè)等待線程(如果存在)也必須確定它是否也可以獲取。 在不同模式下等待的線程共享相同的FIFO隊(duì)列。 通常,實(shí)現(xiàn)子類只支持這些模式之一,但是兩者都可以在
ReadWriteLock中發(fā)揮作用。僅支持獨(dú)占或僅共享模式的子類不需要定義支持未使用模式的方法。
總結(jié)來說就是:
①子類通過繼承AQS并實(shí)現(xiàn)其抽象方法來管理同步狀態(tài),對(duì)于同步狀態(tài)的更改通過提供的getState()、setState(int state)、compareAndSetState(int expect, int update)來進(jìn)行操作,因?yàn)槭褂肅AS操作保證同步狀態(tài)的改變是原子的。
②子類被推薦定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器本身并沒有實(shí)現(xiàn)任何的同步接口,僅僅是定義了若干狀態(tài)獲取和釋放的方法來提供自定義同步組件的使用。
③同步器既可以支持獨(dú)占式的獲取同步狀態(tài),也可以支持共享式的獲取同步狀態(tài)(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同類型的同步組件)
3、同步器是實(shí)現(xiàn)鎖的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語義;
二、AQS的接口和實(shí)例
1、同步器的設(shè)計(jì)實(shí)現(xiàn)原理
繼承同步器并且重寫指定的方法,然后將同步器組合在自定義同步組件的實(shí)現(xiàn)中,并且調(diào)用同步器提供的模板方法(這些模板方法會(huì)調(diào)用重寫的方法);而重寫指定的方法的時(shí)候,需要使用getState()、setState(int state)、compareAndSetState(int expect, int update)來訪問或者更新同步狀態(tài)。下面是源碼中state變量和三個(gè)方法的定義聲明實(shí)現(xiàn)
1 /** 2 * .(同步狀態(tài)) 3 */ 4 private volatile int state; 5 6 /** 7 * (返回當(dāng)前的同步狀態(tài)) 8 * 此操作的內(nèi)存語義為@code volatile read 9 */ 10 protected final int getState() { 11 return state; 12 } 13 14 /** 15 * (設(shè)置新的同步狀態(tài)) 16 * 此操作的內(nèi)存語義為@code volatile read 17 */ 18 protected final void setState(int newState) { 19 state = newState; 20 } 21 22 /** 23 * (如果要更新的狀態(tài)和期望的狀態(tài)相同,那就通過原子的方式更新狀態(tài)) 24 * ( 此操作的內(nèi)存語義為@code volatile read 和 write) 25 * (如果更新的狀態(tài)和期望的狀態(tài)不同就返回false) 26 */ 27 protected final boolean compareAndSetState(int expect, int update) { 28 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 29 }?
?
2、下面介紹AQS提供可被重寫的方法
1 /** 2 * 獨(dú)占式的獲取同步狀態(tài),實(shí)現(xiàn)該方法需要查詢當(dāng)前狀態(tài)并判斷同步狀態(tài)是否符合預(yù)期,然后再進(jìn)行CAS設(shè)置同步狀態(tài) 3 * 4 */ 5 protected boolean tryAcquire(int arg) { 6 throw new UnsupportedOperationException(); 7 } 8 9 /** 10 * 獨(dú)占式的釋放同步狀態(tài),等待獲取同步狀態(tài)的線程可以有機(jī)會(huì)獲取同步狀態(tài) 11 * 12 */ 13 protected boolean tryRelease(int arg) { 14 throw new UnsupportedOperationException(); 15 } 16 17 /** 18 * 嘗試以共享模式獲取。 該方法應(yīng)該查詢對(duì)象的狀態(tài)是否允許在共享模式下獲取該對(duì)象,如果是這樣,就可以獲取它。 該方法總是由執(zhí)行獲取的線程調(diào)用。 19 * 如果此方法報(bào)告失敗,則獲取方法可能將線程排隊(duì)(如果尚未排隊(duì)),直到被其他線程釋放為止。 獲取失敗時(shí)返回負(fù)值,如果在獲取成共享模式下功但沒 20 * 有后續(xù)共享模式獲取可以成功,則為零; 并且如果以共享模式獲取成功并且隨后的共享模式獲取可能成功,則為正值,在這種情況下,后續(xù)等待線程必須檢查可用性。 21 */ 22 protected int tryAcquireShared(int arg) { 23 throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常 24 } 25 26 /** 27 * 嘗試將狀態(tài)設(shè)置為以共享模式釋放同步狀態(tài)。 該方法總是由執(zhí)行釋放的線程調(diào)用。 28 */ 29 protected int tryReleaseShared(int arg) { 30 throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常 31 } 32 33 /** 34 * 當(dāng)前同步器是否在獨(dú)占模式下被線程占用,一般該方法表示是否被當(dāng)前線程所獨(dú)占 35 */ 36 protected int isHeldExclusively(int arg) { 37 throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常 38 }3、同步器提供的模板方法
在實(shí)現(xiàn)自定義同步組件的時(shí)候,需要重寫上面的方法,而下面的模板方法會(huì)調(diào)用上面重寫的方法。下面介紹同步器提供的模板方法
1 /** 2 * 以獨(dú)占模式獲取,忽略中斷。 通過調(diào)用至少一次tryAcquire(int)實(shí)現(xiàn),成功返回。 否則線 3 * 程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,直到成功才調(diào)用tryAcquire(int) 4 */ 5 public final void acquire(int arg) {...} 6 7 /** 8 * 以獨(dú)占方式獲得,如果中斷,中止。 通過首先檢查中斷狀態(tài),然后調(diào)用至少一次 9 * tryAcquire(int) ,成功返回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,調(diào)用 10 * tryAcquire(int)直到成功或線程中斷。 11 */ 12 public final void acquireInterruptibly(int arg) throws InterruptedException {...} 13 14 /** 15 * 嘗試以獨(dú)占模式獲取,如果中斷則中止,如果給定的超時(shí)時(shí)間失敗。 首先檢查中斷狀態(tài),然 16 * 后調(diào)用至少一次tryAcquire(int) ,成功返回。 否則,線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻 17 * 塞,調(diào)用tryAcquire(int)直到成功或線程中斷或超時(shí) 18 */ 19 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {...} 20 21 /** 22 * 以共享模式獲取,忽略中斷。 通過首次調(diào)用至少一次執(zhí)行 tryAcquireShared(int),成功返 23 * 回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,直到成功調(diào)用tryAcquireShared(int) 。 24 */ 25 public final void acquireShared(int arg){...} 26 27 /** 28 * 以共享方式獲取,如果中斷,中止。 首先檢查中斷狀態(tài),然后調(diào)用至少一次 29 * tryAcquireShared(int) ,成功返回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,調(diào)用 30 * tryAcquireShared(int)直到成功或線程中斷。 31 */ 32 public final void acquireSharedInterruptibly(int arg) throws InterruptedException{...} 33 34 /** 35 * 嘗試以共享模式獲取,如果中斷則中止,如果給定的時(shí)間超過,則失敗。 通過首先檢查中斷 36 * 狀態(tài),然后調(diào)用至少一次tryAcquireShared(int) ,成功返回。 否則,線程排隊(duì),可能會(huì)重 37 * 復(fù)阻塞和解除阻塞,調(diào)用tryAcquireShared(int)直到成功或線程中斷或超時(shí)。 38 */ 39 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{...} 40 41 /** 42 * 獨(dú)占式的釋放同步狀態(tài),該方法會(huì)在釋放同步狀態(tài)之后,將同步隊(duì)列中的第一個(gè)節(jié)點(diǎn)包含的線程喚醒 43 */ 44 public final boolean release(int arg){...} 45 46 /** 47 * 共享式的釋放同步狀態(tài) 48 */ 49 public final boolean releaseShared(int arg){...} 50 51 /** 52 * 獲取在等待隊(duì)列上的線程集合 53 */ 54 public final Collection<Thread> getQueuedThreads(){...}三、隊(duì)列同步器的實(shí)現(xiàn)分析
?1、同步隊(duì)列
a)t同步隊(duì)列的實(shí)現(xiàn)原理
AQS內(nèi)部維護(hù)一個(gè)同步隊(duì)列來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗的時(shí)候,AQS會(huì)將當(dāng)前線程以及等待狀態(tài)信息構(gòu)造成一個(gè)結(jié)點(diǎn)Node并將其加入同步隊(duì)列中,同時(shí)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)由持有線程釋放的時(shí)候,會(huì)將同步隊(duì)列中的首節(jié)點(diǎn)喚醒使其再次嘗試獲取同步狀態(tài)。同步隊(duì)列中的結(jié)點(diǎn)用來保存獲取同步狀態(tài)失敗的線程的線程引用、等待狀態(tài)以及前驅(qū)結(jié)點(diǎn)和后繼結(jié)點(diǎn)。下面是Node的屬性分析
1 static final class Node { 2 /** 共享模式下構(gòu)造結(jié)點(diǎn) */ 3 static final Node SHARED = new Node(); 4 /** 獨(dú)占模式下構(gòu)造結(jié)點(diǎn) */ 5 static final Node EXCLUSIVE = null; 6 7 /** 用于指示線程已經(jīng)取消的waitStatus值(由于在同步隊(duì)列中等待的線程等待超時(shí)或者發(fā)生中斷,需要從同步隊(duì)列中取消等待,結(jié)點(diǎn)進(jìn)入該狀態(tài)將不會(huì)發(fā)生變化)*/ 8 static final int CANCELLED = 1; 9 /** waitstatus值指示后續(xù)線程需要取消等待(后繼結(jié)點(diǎn)的線程處于等待狀態(tài),而當(dāng)前結(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者CANCELL,將會(huì)通知后繼結(jié)點(diǎn)的線程以運(yùn)行) */ 10 static final int SIGNAL = -1; 11 /**waitStatus值表示線程正在等待條件(原本結(jié)點(diǎn)在等待隊(duì)列中,結(jié)點(diǎn)線程等待在Condition上,當(dāng)其他線程對(duì)Condition調(diào)用了signal()方法之后)該結(jié)點(diǎn)會(huì)從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,進(jìn)行同步狀態(tài)的獲取 */ 12 static final int CONDITION = -2; 13 /** 14 * waitStatus值表示下一個(gè)共享式同步狀態(tài)的獲取應(yīng)該無條件傳播下去 15 */ 16 static final int PROPAGATE = -3; 17 18 /** 19 * 不同的等到狀態(tài)的int值 20 */ 21 volatile int waitStatus; 22 23 /** 24 * 前驅(qū)結(jié)點(diǎn),當(dāng)結(jié)點(diǎn)加入同步隊(duì)列將會(huì)被設(shè)置前驅(qū)結(jié)點(diǎn)信息 25 */ 26 volatile Node prev; 27 28 /** 29 * 后繼結(jié)點(diǎn) 30 */ 31 volatile Node next; 32 33 /** 34 * 當(dāng)前獲取到同步狀態(tài)的線程 35 */ 36 volatile Thread thread; 37 38 /** 39 * 等待隊(duì)列中的后繼結(jié)點(diǎn),如果當(dāng)前結(jié)點(diǎn)是共享的,那么這個(gè)字段是一個(gè)SHARED常量;也就是說結(jié)點(diǎn)類型(獨(dú)占和共享)和等待隊(duì)列中的后繼結(jié)點(diǎn)公用一個(gè)字段 40 */ 41 Node nextWaiter; 42 43 /** 44 * 如果是共享模式下等待,那么返回true(因?yàn)樯厦娴腘ode nextWaiter字段在共享模式下是一個(gè)SHARED常量) 45 */ 46 final boolean isShared() { 47 return nextWaiter == SHARED; 48 } 49 50 final Node predecessor() throws NullPointerException { 51 Node p = prev; 52 if (p == null) 53 throw new NullPointerException(); 54 else 55 return p; 56 } 57 58 Node() { // 用于建立初始頭結(jié)點(diǎn)或SHARED標(biāo)記 59 } 60 61 Node(Thread thread, Node mode) { // 用于添加到等待隊(duì)列 62 this.nextWaiter = mode; 63 this.thread = thread; 64 } 65 66 Node(Thread thread, int waitStatus) { // Used by Condition 67 this.waitStatus = waitStatus; 68 this.thread = thread; 69 } 70 }
b)同步隊(duì)列示意圖和簡單分析
①同步隊(duì)列示意圖:當(dāng)一個(gè)線程獲取了同步狀態(tài)后,其他線程不能獲取到該同步狀態(tài),就會(huì)被構(gòu)造稱為Node然后添加到同步隊(duì)列之中,這個(gè)添加的過程基于CAS保證線程安全性。
②同步隊(duì)列遵循先進(jìn)先出(FIFO),首節(jié)點(diǎn)是獲取到同步狀態(tài)的結(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)的時(shí)候?qū)?huì)喚醒后繼結(jié)點(diǎn)(然后后繼結(jié)點(diǎn)就會(huì)變成新的首節(jié)點(diǎn)等待獲取同步狀態(tài))
2、獨(dú)占式同步狀態(tài)的獲取和釋放
①前面說過,同步器的acquire()方法會(huì)獲取同步狀態(tài),這個(gè)方法對(duì)不會(huì)響應(yīng)中斷,也就是說當(dāng)線程獲取通同步狀態(tài)失敗后會(huì)被構(gòu)造成結(jié)點(diǎn)加入到同步隊(duì)列中,當(dāng)線程被中斷時(shí)不會(huì)從同步隊(duì)列中移除。
1 /** 2 * ①首先調(diào)用tryAcquire方法嘗試獲取同步狀態(tài),如果獲取同步狀態(tài)失敗,就進(jìn)行下面的操作 3 * ②獲取失敗:按照獨(dú)占式的模式構(gòu)造同步結(jié)點(diǎn)并通過addWaiter方法將結(jié)點(diǎn)添加到同步隊(duì)列的尾部 4 * ③通過acquireQueue方法自旋獲取同步狀態(tài)。 5 * ④如果獲取不到同步狀態(tài),就阻塞結(jié)點(diǎn)中的線程,而結(jié)點(diǎn)中的線程喚醒主要是通過前驅(qū)結(jié)點(diǎn)的出隊(duì)或者被中斷來實(shí)現(xiàn) 6 */ 7 public final void acquire(int arg) { 8 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 9 selfInterrupt(); 10 }? ②下面是addWaiter、enq和自旋獲取同步狀態(tài)acquireQueue方法的實(shí)現(xiàn)(該方法的主要作用就是將獲取同步狀態(tài)失敗的線程構(gòu)造成結(jié)點(diǎn)然后添加到同步隊(duì)列的隊(duì)尾)
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 //嘗試直接放在隊(duì)尾 4 Node pred = tail; //直接獲取同步器的tail結(jié)點(diǎn) 5 if (pred != null) { 6 node.prev = pred; 7 if (compareAndSetTail(pred, node)) { 8 //隊(duì)尾結(jié)點(diǎn)不為空通過原子操作將構(gòu)造的結(jié)點(diǎn)置為隊(duì)尾結(jié)點(diǎn) 9 pred.next = node; 10 return node; 11 } 12 } 13 //采用自旋方式保證構(gòu)造的結(jié)點(diǎn)添加到同步隊(duì)列中 14 enq(node); 15 return node; 16 } 17 private Node enq(final Node node) { 18 for (;;) { //死循環(huán)知道添加成功 19 Node t = tail; 20 if (t == null) { // Must initialize 21 if (compareAndSetHead(new Node())) 22 tail = head; 23 } else { 24 node.prev = t; 25 //通過CAS方式將結(jié)點(diǎn)添加到同步隊(duì)列之后才會(huì)返回,否則就會(huì)不斷嘗試添加(這樣實(shí)際上就是在并發(fā)情況下,把向同步隊(duì)列添加Node變得串行化了) 26 if (compareAndSetTail(t, node)) { 27 t.next = node; 28 return t; 29 } 30 } 31 } 32 } 33 /** 34 * 通過tryAcquire()和addWaiter(),表示該線程獲取同步狀態(tài)已經(jīng)失敗,被放入同步 35 * 隊(duì)列尾部了。線程阻塞等待直到其他線程(前驅(qū)結(jié)點(diǎn)獲得同步裝填或者被中斷)釋放同步狀 36 * 態(tài)后喚醒自己,自己才能獲得。 37 */ 38 final boolean acquireQueued(final Node node, int arg) { 39 boolean failed = true; 40 try { 41 boolean interrupted = false; 42 //線程在死循環(huán)的方式中嘗試獲取同步狀態(tài) 43 for (;;) { 44 final Node p = node.predecessor(); //獲取前驅(qū)結(jié)點(diǎn) 45 //只有前驅(qū)接待是頭結(jié)點(diǎn)的時(shí)候才能嘗試獲取同步狀態(tài) 46 if (p == head && tryAcquire(arg)) { 47 setHead(node); //獲取到同步狀態(tài)之后,就將自己設(shè)置為頭結(jié)點(diǎn) 48 p.next = null; //前驅(qū)結(jié)點(diǎn)已經(jīng)獲得同步狀態(tài)去執(zhí)行自己的程序了,所以需要釋放掉占用的同步隊(duì)列的資源,由JVM回收 49 failed = false; 50 return interrupted; 51 } 52 //如果獲取同步狀態(tài)失敗,應(yīng)該自旋等待繼續(xù)獲取并且校驗(yàn)自己的中斷標(biāo)志位信息 53 if (shouldParkAfterFailedAcquire(p, node) && 54 parkAndCheckInterrupt()) 55 interrupted = true; //如果被中斷,就改變自己的中斷標(biāo)志位狀態(tài)信息 56 } 57 } finally { 58 if (failed) 59 cancelAcquire(node); 60 } 61 }③獨(dú)占式獲取同步狀態(tài)的整個(gè)流程
④獨(dú)占式同步器的釋放:release方法執(zhí)行時(shí),會(huì)喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)線程
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;//頭結(jié)點(diǎn)//喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)線程if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }3、共享式同步狀態(tài)的獲取和釋放?
①共享式獲取和獨(dú)占式獲取最主要的區(qū)別是能否有多個(gè)線程同時(shí)獲取到同步狀態(tài)。如圖所示簡易描述二者的區(qū)別(共享式訪問的時(shí)候,可以允許多個(gè)線程訪問資源,但是存在獨(dú)占式訪問的時(shí)候,同一時(shí)刻其他的不管是共享還是獨(dú)占都會(huì)被阻塞)
②關(guān)于共享式獲取同步狀態(tài)的方法
1 /** 2 * 此方法是共享模式下線程獲取共享同步狀態(tài)的頂層入口。它會(huì)嘗試去獲取同步狀態(tài),獲取成功則直接返回, 3 * 獲取失敗則進(jìn)入等待隊(duì)列一直嘗試獲取(執(zhí)行doAcquireShared方法體中的內(nèi)容),直到獲取到資源為止(條件就是tryAcquireShared方法返回值大于等于0),整個(gè)過程忽略中斷 4 */ 5 public final void acquireShared(int arg) { 6 if (tryAcquireShared(arg) < 0) 7 doAcquireShared(arg); 8 } 9 /** 10 * "自旋"嘗試獲取同步狀態(tài) 11 */ 12 private void doAcquireShared(int arg) { 13 //首先將該線程包括線程引用、等待狀態(tài)、前驅(qū)結(jié)點(diǎn)和后繼結(jié)點(diǎn)的信息封裝臺(tái)Node中,然后添加到等待隊(duì)列里面(一共享模式添加) 14 final Node node = addWaiter(Node.SHARED); 15 boolean failed = true; 16 try { 17 boolean interrupted = false; //當(dāng)前線程的中斷標(biāo)志 18 for (;;) { 19 final Node p = node.predecessor(); //獲取前驅(qū)結(jié)點(diǎn) 20 if (p == head) { 21 //當(dāng)前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)的時(shí)候就會(huì)以共享的方式去嘗試獲取同步狀態(tài) 22 int r = tryAcquireShared(arg); 23 //判斷tryAcquireShared的返回值 24 if (r >= 0) { 25 //如果返回值大于等于0,表示獲取同步狀態(tài)成功,就修改當(dāng)前的頭結(jié)點(diǎn)并將信息傳播都后續(xù)的結(jié)點(diǎn)隊(duì)列中 26 setHeadAndPropagate(node, r); 27 p.next = null; // 釋放掉已經(jīng)獲取到同步狀態(tài)的前驅(qū)結(jié)點(diǎn)的資源 28 if (interrupted) 29 selfInterrupt(); //檢查中斷標(biāo)志 30 failed = false; 31 return; 32 } 33 } 34 if (shouldParkAfterFailedAcquire(p, node) && 35 parkAndCheckInterrupt()) 36 interrupted = true; 37 } 38 } finally { 39 if (failed) 40 cancelAcquire(node); 41 } 42 }根據(jù)源代碼我們可以了解共享式獲取同步狀態(tài)的整個(gè)過程
首先同步器會(huì)調(diào)用tryAcquireShared方法來嘗試獲取同步狀態(tài),然后根據(jù)這個(gè)返回值來判斷是否獲取到同步狀態(tài)(當(dāng)返回值大于等于0可視為獲取到同步狀態(tài));如果第一次獲取失敗的話,就進(jìn)入'自旋'狀態(tài)(執(zhí)行doAcquireShared方法)一直嘗試去獲取同步狀態(tài);在自旋獲取中,如果檢查到當(dāng)前前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)的話,就會(huì)嘗試獲取同步狀態(tài),而一旦獲取成功(tryAcquireShared方法返回值大于等于0)就可以從自旋狀態(tài)退出。
另外,還有一點(diǎn)就是上面說到的一個(gè)處于等待隊(duì)列的線程要想開始嘗試去獲取同步狀態(tài),需要滿足的條件就是前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn),那么它本身就是整個(gè)隊(duì)列中的第二個(gè)結(jié)點(diǎn)。當(dāng)頭結(jié)點(diǎn)釋放掉所有的臨界資源之后,我們考慮每個(gè)線程運(yùn)行所需資源的不同數(shù)量問題,如下圖所示
③共享式同步狀態(tài)的釋放
對(duì)于支持共享式的同步組件(即多個(gè)線程同同時(shí)訪問),它們和獨(dú)占式的主要區(qū)別就是tryReleaseShared方法必須確保同步狀態(tài)的釋放是線程安全的(CAS的模式來釋放同步狀態(tài),因?yàn)榧热皇嵌鄠€(gè)線程能夠訪問,那么釋放的時(shí)候也會(huì)是多個(gè)線程的,就需要保證釋放時(shí)候的線程安全)
1 /** 2 * 該方法是共享模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會(huì)喚醒等待隊(duì)列里的其他線程來獲取資源。 3 */ 4 public final boolean releaseShared(int arg) { 5 if (tryReleaseShared(arg)) { 6 doReleaseShared(); // 7 return true; 8 } 9 return false; 10 }四、自定義同步組件的實(shí)現(xiàn)
1、共享式鎖的實(shí)現(xiàn)
①、自定義一個(gè)同步組件,可以允許兩個(gè)線程訪問(共享式同步組件),超過兩個(gè)線程就會(huì)被阻塞。
②、既然是共享式同步組件,按照前面所說的,組件本身需要使用AQS提供的共享式模板方法acquireShared等;組件的內(nèi)部類需要實(shí)現(xiàn)AQS,并且重寫關(guān)于共享式獲取同步狀態(tài)的方法(tryAcquireShared()、tryReleaseShared()等共享模式下的方法)。
③、既然是兩個(gè)線程能夠同時(shí)訪問的話,那么狀態(tài)數(shù)的取值范圍就是0、1、2了,每當(dāng)一個(gè)線程獲取到同步狀態(tài)的時(shí)候state值減1,反之就會(huì)增加1;當(dāng)state值為0的時(shí)候就會(huì)阻塞其他想要獲取同步狀態(tài)的線程。對(duì)于同步狀態(tài)的更改需要使用CAS來進(jìn)行保證原子性。
1 package cn.source.concurrent; 2 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 5 import java.util.concurrent.locks.Condition; 6 import java.util.concurrent.locks.Lock; 7 8 public class TestAQS implements Lock{ 9 10 private Sync sync = new Sync(2); 11 12 private static class Sync extends AbstractQueuedSynchronizer { 13 14 Sync(int num) { 15 if(num <= 0) { 16 throw new RuntimeException("num需要大于0"); 17 } 18 setState(num); 19 } 20 21 @Override 22 protected int tryAcquireShared(int arg) { 23 for(; ;) { 24 int currentState = getState(); 25 int newState = currentState - arg; 26 if(newState < 0 || compareAndSetState(currentState, newState)) { 27 return newState; 28 } 29 } 30 } 31 32 @Override 33 protected boolean tryReleaseShared(int arg) { 34 for(; ;) { 35 int currentState = getState(); 36 int newState = currentState + arg; 37 if(compareAndSetState(currentState, newState)) { 38 return true; 39 } 40 } 41 } 42 43 44 } 45 @Override 46 public void lock() { 47 sync.acquireShared(1); 48 } 49 50 @Override 51 public void unlock() { 52 sync.releaseShared(1); 53 } 54 55 //...... 56 } 共享式鎖 1 /** 2 * 測試結(jié)果:輸出的線程名稱是成對(duì)的,保證同一時(shí)刻只有兩個(gè)線程能夠獲取到鎖 3 * 4 */ 5 public class TestLockShare { 6 @Test 7 public void test() { 8 Lock lock = new TestAQS(); 9 class Worker extends Thread { 10 11 @Override 12 public void run() { 13 while(true) { 14 lock.lock(); 15 try { 16 Thread.sleep(1000); 17 System.out.println(Thread.currentThread().getName()); 18 Thread.sleep(1000); 19 } catch (Exception e) { 20 e.printStackTrace(); 21 } finally { 22 lock.unlock(); 23 } 24 } 25 } 26 27 } 28 29 for (int i = 0; i < 8; i++) { 30 Worker worker = new Worker(); 31 worker.setDaemon(true); 32 worker.start(); 33 34 } 35 for (int i = 0; i < 8; i++) { 36 try { 37 Thread.sleep(1000); 38 } catch (InterruptedException e) { 39 // TODO Auto-generated catch block 40 e.printStackTrace(); 41 } 42 System.out.println(); 43 } 44 } 45 } 共享式鎖測試2、獨(dú)占式鎖的實(shí)現(xiàn)
1 package cn.source.concurrent; 2 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 5 import java.util.concurrent.locks.Condition; 6 import java.util.concurrent.locks.Lock; 7 8 public class Mutex implements Lock{ 9 10 private Sync sync = new Sync(); 11 12 private static class Sync extends AbstractQueuedSynchronizer { 13 14 /** 15 * 嘗試獲取資源,立即返回。成功則返回true,否則false。 16 */ 17 @Override 18 protected boolean tryAcquire(int arg) { 19 if(compareAndSetState(0, 1)) {//state為0才設(shè)置為1,不可重入! 20 setExclusiveOwnerThread(Thread.currentThread());//設(shè)置為當(dāng)前線程獨(dú)占資源 21 return true; 22 } 23 return false; 24 } 25 26 /** 27 * 嘗試釋放資源,立即返回。成功則為true,否則false。 28 */ 29 @Override 30 protected boolean tryRelease(int arg) { 31 if(getState() == 0) { //既然來釋放,那肯定就是已占有狀態(tài)了。只是為了保險(xiǎn),多層判斷! 32 throw new IllegalMonitorStateException(); 33 } 34 setExclusiveOwnerThread(null); 35 setState(0); 36 return true; 37 } 38 39 @Override 40 protected boolean isHeldExclusively() { 41 // 判斷是否鎖定狀態(tài) 42 return getState() == 1; 43 } 44 45 } 46 47 @Override 48 public void lock() { 49 sync.acquire(1); 50 } 51 52 @Override 53 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 54 return sync.tryAcquire(1); 55 } 56 57 @Override 58 public void unlock() { 59 sync.release(1); 60 } 61 62 } 獨(dú)占式鎖 1 public class TestMutex { 2 @Test 3 public void test() { 4 Lock lock = new Mutex(); 5 class Worker extends Thread { 6 7 @Override 8 public void run() { 9 while(true) { 10 lock.lock(); 11 try { 12 Thread.sleep(1000); 13 System.out.println(Thread.currentThread().getName()); 14 Thread.sleep(1000); 15 } catch (Exception e) { 16 e.printStackTrace(); 17 } finally { 18 lock.unlock(); 19 } 20 } 21 } 22 23 } 24 25 for (int i = 0; i < 8; i++) { 26 Worker worker = new Worker(); 27 worker.setDaemon(true); 28 worker.start(); 29 30 } 31 for (int i = 0; i < 8; i++) { 32 try { 33 Thread.sleep(1000); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 System.out.println(); 38 } 39 } 40 } 獨(dú)占式鎖測試轉(zhuǎn)載于:https://www.cnblogs.com/fsmly/p/10701109.html
總結(jié)
以上是生活随笔為你收集整理的Java中的队列同步器AQS的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第四章小结
- 下一篇: 第二周 第七节 列表的使用