生活随笔
收集整理的這篇文章主要介紹了
Java AQS
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
AQS
- AbstractQueuedSynchronizer (AQS)抽象的隊列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴于它,比如ReentrantLock/Semaphore/CountDownLatch
- 它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這里volatile是核心關鍵詞,具體volatile的語義
- state的訪問方式有三種: getState() 、setState() 、compareAndSetState()
- AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執行,如ReentrantLock)和Share (共享,多個線程可同時執行,例如Semaphore/CountDownLatch)不同的自定義同步器爭用共享資源的方式也不同
- 線程搶占同一份資源,只有被標桿節點選中的才可以訪問資源,其余的進入排隊隊列,如果是公平鎖,則按照先后順序進行對于資源的訪問;如果是非公平鎖,則當標桿節點釋放完之后,大家開始搶占資源,誰搶到算誰的,沒有先來后到之分
自定義的同步容器
- 自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在底層實現好了
主要實現以下幾種方法
- isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false?
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源
- tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗則返回false
例子
- 以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨占該鎖并將state+1。此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態的
- 重入鎖,在需要進行同步的代碼部分加上鎖定,但不要忘記最后一定要釋放鎖定,不然會造成鎖永遠無法釋放,其他線程永遠進不來的結果
- t2搶占線程,只有等t2線程結束,線程t1才有資格搶占資源
package com.example.core.aqs;import java.util.concurrent.locks.ReentrantLock;public class UseReentrantLock {private ReentrantLock reentrantLock = new ReentrantLock();public void method(){reentrantLock.lock();try{System.out.println("當前線程:"+Thread.currentThread().getName()+"進入...");Thread.sleep(2000);System.out.println("當前線程:"+Thread.currentThread().getName()+"退出...");}catch(InterruptedException e){e.printStackTrace();}finally {reentrantLock.unlock();}}public static void main(String[] args) {UseReentrantLock useLock = new UseReentrantLock();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {useLock.method();}},"t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {useLock.method();}},"t2");t1.start();t2.start();}
}
/*
output:
當前線程:t2進入...
當前線程:t2退出...
當前線程:t1進入...
當前線程:t1退出...*/
公平鎖和非公平鎖
- Lock lock = new ReentrantLock(boolean isFair);
lock用法
- tryLock():嘗試獲得鎖,獲得結果用true/false返回
- tryLock():在給定的時間內嘗試獲得鎖,獲得結果用true/false返回
- isFair():是否是公平鎖
- isLocked():是否鎖定
- getHoldCount(): 查詢當前線程保持此鎖的個數,也就是調用lock()次數
- lockInterruptibly():優先響應中斷的鎖
- getQueueLength():返回正在等待獲取此鎖定的線程數
- getWaitQueueLength():返回等待與鎖定相關的給定條件Condition的線程數
- hasQueuedThread(Thread thread): 查詢指定的線程是否正在等待此鎖
- hasQueuedThreads():查詢是否有線程正在等待此鎖
- hasWaiters():查詢是否有線程正在等待與此鎖定有關的condition條件
- 再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是并行執行的,每個子線程執行完后countDown()一次,state會CAS減1。等到所有子線程都執行完后(即state=0),會unpark()調用線程,然后主調用線程就會從await()函數返回,繼續后余動作
AQS Condition
- 使用synchronized的時候,如果需要多線程間進行協作工作則需要Object的wait()和notify()、notifyAll()方法進行配合工作
- 那么同樣,我們在使用Lock的時候,可以使用一個新的等待/通知的類,它就是Condition。這個Condition一定是針對具體某一把鎖的。也就是在只有鎖的基礎之上才會產生Condition
- 我們可以通過一個Lock對象產生多個Condition進行多線程間的交互,非常的靈活。可以使得部分需要喚醒的線程喚醒,其他線程則繼續等待通知
使用一個條件
- t1線程先執行,進入等待的時候,釋放鎖,t2才可以得以執行,t2線程開始執行,對于t1線程發出喚醒通知,t1得以繼續執行,最后釋放鎖
package com.example.core.aqs;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class UseCondition {//現在有一把鎖private Lock lock = new ReentrantLock(); //synchronized wait ---- notify//基于這把鎖產生了一個 condition: 作用是對于這把鎖的 喚醒 和 等待操作private Condition condition = lock.newCondition();public void method1(){lock.lock();try {System.out.println("當前線程:" + Thread.currentThread().getName() + "進入等待狀態..");Thread.sleep(3000);System.out.println("當前線程:" + Thread.currentThread().getName() + "釋放鎖..");condition.await();System.out.println("當前線程:" + Thread.currentThread().getName() +"繼續執行...");} catch (Exception e) {e.printStackTrace();} finally {System.err.println(Thread.currentThread().getName() + " unlock");lock.unlock();}}public void method2(){lock.lock();try {System.out.println("當前線程:" + Thread.currentThread().getName() + "進入..");Thread.sleep(3000);System.out.println("當前線程:" + Thread.currentThread().getName() + "發出喚醒..");condition.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) throws Exception {final UseCondition uc = new UseCondition();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {uc.method1();}}, "t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {uc.method2();}}, "t2");t1.start();Thread.sleep(1);t2.start();}
}
/*
output
當前線程:t1進入等待狀態..
當前線程:t1釋放鎖..
當前線程:t2進入..
當前線程:t2發出喚醒..
當前線程:t1繼續執行...
t1 unlock*/
使用多個條件
- 創建兩個條件,條件c1和c2,c1條件受制于t1和t2線程,由t3線程進行喚醒;c2條件受制于t3線程,由t5線程進行喚醒
package com.example.core.aqs;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class UseManyCondition {private Lock lock = new ReentrantLock();private Condition c1 = lock.newCondition();private Condition c2 = lock.newCondition();public void m1(){try {lock.lock();System.out.println("當前線程:" +Thread.currentThread().getName() + "進入方法m1等待..");c1.await();c1條件 由m4喚醒System.out.println("當前線程:" +Thread.currentThread().getName() + "方法m1繼續..");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void m2(){try {lock.lock();System.out.println("當前線程:" +Thread.currentThread().getName() + "進入方法m2等待..");c1.await();//c1條件 由m4喚醒System.out.println("當前線程:" +Thread.currentThread().getName() + "方法m2繼續..");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void m3(){try {lock.lock();System.out.println("當前線程:" +Thread.currentThread().getName() + "進入方法m3等待..");c2.await();c2條件 由m5喚醒System.out.println("當前線程:" +Thread.currentThread().getName() + "方法m3繼續..");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void m4(){try {lock.lock();System.out.println("當前線程:" +Thread.currentThread().getName() + "喚醒..");c1.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void m5(){try {lock.lock();System.out.println("當前線程:" +Thread.currentThread().getName() + "喚醒..");c2.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) {final UseManyCondition umc = new UseManyCondition();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {umc.m1();}},"t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {umc.m2();}},"t2");Thread t3 = new Thread(new Runnable() {@Overridepublic void run() {umc.m3();}},"t3");Thread t4 = new Thread(new Runnable() {@Overridepublic void run() {umc.m4();}},"t4");Thread t5 = new Thread(new Runnable() {@Overridepublic void run() {umc.m5();}},"t5");t1.start();t2.start();t3.start();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}t4.start();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}t5.start();}
}
/*
output
當前線程:t1進入方法m1等待..
當前線程:t3進入方法m3等待..
當前線程:t2進入方法m2等待..
當前線程:t4喚醒..
當前線程:t1方法m1繼續..
當前線程:t2方法m2繼續..
當前線程:t5喚醒..
當前線程:t3方法m3繼續..*/
AQS-ReentrantReadWriteLock
- 讀寫鎖ReentrantReadWriteLock,其核心就是實現讀寫分離的鎖。在高并發訪問下,尤其是讀多寫少的情況下,性能要遠高于重入鎖。
- 之前學synchronized、ReentrantLock時,我們知道,同一時間內,只能有一個線程進行訪問被鎖定的代碼,那么讀寫鎖則不同,其本質是分成兩個鎖,即讀鎖、寫鎖。在讀鎖下,多個線程可以并發的進行訪問,但是在寫鎖的時候,只能一個一個的順序訪問
- 口訣:讀讀共享,寫寫互斥,讀寫互斥
- t1和t2都是讀鎖,t3是寫鎖,t1和t2可以并行執行,他們與t3之間不可以同時執行。讀讀共享,寫寫互斥,讀寫互斥
package com.example.core.aqs;import java.util.concurrent.locks.ReentrantReadWriteLock;public class UseReadWriteLock {private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();private ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();public void read(){readLock.lock();try{System.out.println("當前線程 "+ Thread.currentThread().getName()+ " 進入了讀方法");Thread.sleep(3000);System.out.println("當前線程 "+ Thread.currentThread().getName()+ " 退出了讀方法");}catch (Exception e){e.printStackTrace();}finally{readLock.unlock();}}public void write(){writeLock.lock();try{System.out.println("當前線程 "+ Thread.currentThread().getName()+ " 進入了寫方法");Thread.sleep(3000);System.out.println("當前線程 "+ Thread.currentThread().getName()+ " 退出了寫方法");}catch (Exception e){e.printStackTrace();}finally{writeLock.unlock();}}public static void main(String[] args) {UseReadWriteLock rwLock = new UseReadWriteLock();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {rwLock.read();}},"t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {rwLock.read();}},"t2");Thread t3 = new Thread(new Runnable() {@Overridepublic void run() {rwLock.write();}},"t3");t1.start();t2.start();t3.start();}
}
/*
output:
當前線程 t3 進入了寫方法
當前線程 t3 退出了寫方法
當前線程 t1 進入了讀方法
當前線程 t2 進入了讀方法
當前線程 t1 退出了讀方法
當前線程 t2 退出了讀方法*/
LockSupport
- 提供park()和unpark()方法實現阻塞線程和解除線程阻塞,實現的阻塞和解除阻塞是基于”許可(permit)”作為關聯,permit相當于一個信號量(0,1),默認是0. 線程之間不再需要一個Object或者其它變量來存儲狀態,不再需要關心對方的狀態
- LockSupport不需要在同步代碼塊里 。所以線程間也不需要維護一個共享的同步對象了,實現了線程間的解耦.
- unpark函數可以先于park調用,所以不需要擔心線程間的執行的先后順序
package com.example.core.aqs;import java.util.concurrent.locks.LockSupport;public class UseLockSupport {public static void main(String[] args) throws Exception {Thread A = new Thread(new Runnable() {@Overridepublic void run() {int sum = 0;for(int i=0;i<10;i++){sum+=i;}try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}LockSupport.park(); //滯后的System.out.println(sum);}});A.start();//后阻塞:Thread.sleep(1000);LockSupport.unpark(A); //優先的}
}
package com.example.core.aqs;public class UseObjectLock {public static void main(String[] args) throws Exception {Object lock = new Object();Thread A = new Thread(new Runnable() {@Overridepublic void run() {int sum = 0;for(int i=0;i<10;i++){sum+=i;}synchronized (lock) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(sum);}});A.start();//后阻塞:Thread.sleep(1000);synchronized (lock) {lock.notify();}}
}
AQS-鎖優化
- 避免死鎖
- 減小鎖的持有時間?
- 減小鎖的粒度?
- 鎖的分離?
- 盡量使用無鎖的操作,如原子操作(Atomic系列類),volatile關鍵字
acquire(int)
- 此方法是獨占模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅只限于lock()。獲取到資源后,線程就可以去執行其臨界區代碼了
- tryAcquire()嘗試直接去獲取資源,如果成功則直接返回
- addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式
- acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false
- 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上
?
總結
以上是生活随笔為你收集整理的Java AQS的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。