Java 多线程 —— AQS 详解
引言
AQS 是AbstractQuenedSynchronizer 的縮寫,抽象的隊列式同步器,它是除了java自帶的synchronized關鍵字之外的鎖機制。是 JUC 下的重要組件。
相關產物有:ReentrantLock、CountDownLatch、Semaphore、ReadWriteLock等。
一、AQS的設計思想
AbstractQuenedSynchronizer 維護了一個 volatile int state 變量,代表共享資源。
若state 是0,代表資源空閑,當前線程將 0 改為 1,表示上鎖,當前線程置為工作線程;
若state不為0,代表資源占用,當前線程依然會 acquire() 一個資源,如果恰好是當前的工作線程,那么state 累加,以此描述“重入性”;如果當前線程并不是工作線程,則會被安置在一個由AQS維護的資源等待隊列。
AQS隊列會讓第一個線程Node自旋獲取資源,而后面的線程,則通過?LockSupport.park(this) 方法將線程置為 WAITING 狀態等待被喚醒。
如果第一個線程獲取到了資源,那么就將它設置為隊列的 head 節點,原 head 就會被移出隊列。
AQS的設計中用到了模板方法模式,不同的資源共享機制如互斥或共享可以由子類自定義實現:
Exclusive:如ReentrantLock、
Share:如信號量、閉鎖、讀寫鎖等。
AQS 的另一個特點是自旋+CAS。
在請求資源和入列等操作中,經常會看到 for(;;) 、compareAndSetState、compareAndSetTail等操作,這與synchronized的實現有很大區別。
通過比較并設置的方式,可以有效提高資源獲取的效率,但同時也會消耗額外的CPU資源。
二、兩種資源訪問策略的代表
在AQS中維護了一個 Node 節點,它有兩種等待模式,同時也表示資源的兩種不同的訪問策略:
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;由此,衍生出兩類不同的子類實現,第一類是以ReentrantLock為代表的互斥鎖,它在語義上與synchronized實現了相同的互斥性和可重入性;另一類是以閉鎖CountDownLatch 為代表的線程同步工具。
2.1 ReentrantLock
首先 AQS 中的 state 為 0.
A 線程在執行 lock() 方法后,以獨占方式 CAS state 為 1。AQS 會記錄 A 線程為當前的獨占線程,其他線程如果再嘗試獲取資源,就會進入等待隊列,直到 A 線程調用 unlock() 方法,釋放了資源,即 state 回歸 0 狀態。
在“重入性”方面,如果A線程第二次嘗試取鎖,state 會累加。也就是說,上鎖的次數一定等于釋放鎖的次數。
2.2 CountDownLatch
CountDownLatch 翻譯為 “閉鎖”或“門閂”,這是一種非常好用的同步工具,可以延遲線程的進度直到終止狀態。
與 ReentrantLock 不同的是,在構造 CountDownLatch 對象的時候,會先設定一個 state 大小:
CountDownLatch latch = new CountDownLatch(3);這個 3 就是 state 變量的初始值,然后線程使用 countDown() 方法遞減這個計數,直到 state = 0,放行所有 waiting 中的線程。
CountDownLatch 維護的 state 表示的是事件數量,當指定數量的事件執行完畢后,就會 unpark() 主調線程,繼續后續動作。
在使用CountDownLatch時有一個誤區是,state 的值就代表了線程的數量,認為我 state = 3 ,就需要 3 個線程去執行任務,其實,state = 10 也依然可以使用 一個線程去執行,關鍵要區分事件與并行任務的概念。
三、ReentrantLock 源碼
作為補充 synchronized 的鎖機制,ReentrantLock 顯示鎖的功能非常強大,但這里不打算全面分析ReentrantLock的奇技淫巧,而是從 lock() 方法出發,分析一下 AQS 是如何實現資源的鎖定和等待隊列的維護的。
3.1?acquire
acquire 是 AQS 的頂層入口,他表示獲取鎖資源。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }Doug Lea 的代碼非常簡潔,根本沒有一句廢話,就連代碼結構也非常精簡,如果是分析源碼的話,我們可以嘗試去改寫一下這個方法,讓其可讀性更強一些:
public final void acquire(int arg) {if (!tryAcquire(arg)) {Node newWaiter = addWaiter(Node.EXCLUSIVE);boolean needInterrupt = acquireQueued(newWaiter, arg);if (needInterrupt) {selfInterrupt();}} }從方法中的一系列方法名和判斷邏輯來看。
嘗試獲取資源,如果成功,則直接返回。如果不成功,addWaiter 添加一個獨占模式的等待者,acquireQueued 以排隊的方式去獲取資源。
3.2 tryAcquire
tryAcquire 在 ReentrantLock 中有兩種實現,分別是:
FairSync 中的公平鎖實現;
NonfairSync 中的非公平鎖實現
當然,公平與非公平并不是重點,就以非公平的實現來看一下,
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; }當前線程會?CAS state 0->1,或累加重入,成功返回true,失敗返回false。
3.3 addWaiter
在acquire上鎖操作失敗后,會執行這個方法:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node; }addWaiter ,添加一個等待者,它只完成一項工作,就是向等待隊列中添加一個 Node:
1、將當前線程封裝為一個隊列 Node;
2、取得隊列的尾節點 tail,并CAS 新的節點設置為新的 tail
3、設置新 tail 成功,直接返回
4、若設置新 tail 不成功,或者干脆,原tail 就不存在,執行 enq 方法,自旋操作以上步驟,直到成功。
enq方法是 enqueue 的縮寫,意思是“使隊列化”,它就是一個 while-true ,如果隊列不存在,就創建一個隊列,如果隊列已經存在,就把 node 放到最后一個:
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }這里就用到了自旋操作,每次自旋都會獲取當前的 tail 節點,避免在設置的過程中間被其他線程加塞,卻又不知道。
剛進入方法的時候,肯定需要走初始化的邏輯,這會創建一個 空的 Node 節點作為 head,所以由此我們也知道,AQS 隊列中的頭結點實際上就是一個沒有實際意義的功能型節點,里邊是沒有線程的,真正封裝了線程的節點是從第二個節點開始。
總體來看,addWaiter 完全就是一個 do-while 循環,先執行一次 CASTail,失敗后循環執行CASTail,直到成功后返回該 node,同時也是新的 tail 節點。
3.4?acquireQueued
在 addWaiter 添加了新的 tail 后,需要做哪些事情呢?acquireQueued!
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }這里需要說明一下,該方法的邏輯兼顧了中斷的操作,如果對中斷機制不太了解,可以暫時不去理會。
該方法同樣是一個 while-true 循環,當且僅當,當前節點是隊列中第二個節點(addWaiter中已經很明確,AQS隊列中的head 節點就是一個空的 Node),并且 tryAcquire 成功,才會返回。在返回之前,僅做了一些隊列的維護工作:設置新的head 節點。
如果沒有“當且僅當”,那么執行 park:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }也就是說,除了第二個節點以后的節點,都要進行 park,即線程切換為 WAITING 狀態。
四、AQS acquire 流程
經過了上一節的源碼分析,我們已經大概清楚了 lock() 方法調用之后發生的事情,接下來就需要總結一下 acquire 流程步驟,提煉一下 AQS 隊列的工作原理:
總結
AQS 使用了大量的 CAS 操作,避免上鎖,你在ReentrantLock中看不到一句 synchronized 。
通過CAS 和自旋的配合可以一定程度上提高同步代碼的性能。
state 以 volatile 類型修飾,可以在多線程之間提供可見性。
ReentrantLock 和 CountDownLatch 對 state 的訪問方式分為獨占和共享兩種,本文雖然沒有解析 CountDownLatch 的源碼,但通過上面源碼的分析,可以想到其大致實現流程。
總結
以上是生活随笔為你收集整理的Java 多线程 —— AQS 详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 云麦体脂秤华为体脂秤_华为、小米和有品体
- 下一篇: Java中的Unsafe