深入java并发包源码(三)AQS独占方法源码分析
深入java并發包源碼(一)簡介
深入java并發包源碼(二)AQS的介紹與使用
深入java并發包源碼(三)AQS獨占方法源碼分析
AQS 的實現原理
學完用 AQS 自定義一個鎖以后,我們可以來看一下剛剛使用過的方法的實現。
分析源碼的時候會省略一些不重要的代碼。
AQS 的實現是基于一個 FIFO 隊列的,每一個等待的線程被封裝成 Node 存放在等待隊列中,頭結點是空的,不存儲信息,等待隊列中的節點都是阻塞的,并且在每次被喚醒后都會檢測自己的前一個節點是否為頭結點,如果是頭節點證明在這個線程之前沒有在等待的線程,就嘗試著去獲取共享資源。
AQS 的繼承關系
AQS 繼承了AbstractOwnableSynchronizer,我們先分析一下這個父類。
public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {protected AbstractOwnableSynchronizer() { }/*** 獨占模式下的線程*/private transient Thread exclusiveOwnerThread;/*** 設置線程,只是對線程的 set 方法*/protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}/*** 設置線程,對線程的 get 方法*/protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;} }父類非常簡單,持有一個獨占模式下的線程,然后就只剩下對這個線程的 get 和 set 方法。
AQS的內部類
AQS 是用鏈表隊列來實現線程等待的,那么隊列肯定要有節點,我們先從節點講起。
Node 類,每一個等待的線程都會被封裝成 Node 類
Node 的域
public class Node {int waitStatus;Node prev;Node next;Thread thread;Node nextWaiter; }waitStatus:等待狀態
prev:前驅節點
next:后繼節點
thread:持有的線程
nextWaiter:condiction 隊列中的后繼節點
Node 的 status:
Node 的狀態有四種:
取消狀態的值是唯一的正數,也是唯一當排隊排到它了也不要資源而是直接輪到下個線程來獲取資源的
AQS 中的方法源碼分析
acquire
這個方法執行了:
- 使用我們實現的 tryAcquire 來嘗試獲得鎖,如果獲得了鎖則退出
- 如果沒有獲得鎖就將線程添加到等待隊列中
看到上面的 tryAcquire 返回 false 后就會調用 addWaiter 新建節點加入等待隊列中。參數 EXCLUSIVE 是獨占模式。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 拿到尾節點,如果尾節點是空則說明是第一個節點,就直接入隊就好了Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果尾節點不是空的,則需要特殊方法入隊enq(node);return node; }在 addWaiter 方法創建完節點后,調用 enq 方法,在循環中用 CAS 操作將新的節點入隊。
因為可能會有多個線程同時設置尾節點,所以需要放在循環中不斷的設置尾節點。
private Node enq(final Node node) {for (;;) {Node t = tail;// 查看尾節點if (t == null) { // Must initialize// 尾節點為空則設置為剛剛創建的節點if (compareAndSetHead(new Node()))tail = head;} else {// 尾節點node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }在這里,節點入隊就結束了。
那么我們回來前面分析的方法,
public final void acquire(long arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }剛剛分析完了 addWaiter 方法,這個方法返回了剛剛創建并且加入的隊列。現在開始分析 acquireQueued 方法。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;// 在循環中去獲取鎖for (;;) {// 拿前一個節點final Node p = node.predecessor();// 如果前一個節點是頭結點則嘗試著去獲取鎖if (p == head && tryAcquire(arg)) {// 拿到鎖后把這個節點設為頭結點,這里 setHead 會把除了 next 以外的數據清除setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 這個方法查看在獲取鎖失敗以后是否中斷,如果否的話就調用// parkAndCheckInterrupt 阻塞方法線程,等待被喚醒if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }acquireInterruptibly
因為很像所以順便來看一下 acquireInterruptibly 所調用的方法:
private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}// 只有這一句有差別,獲取失敗了并且檢測到中斷位被設為 true 直接拋出異常if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }acquireNanos
再來看一下有限時間的,當獲取超時以后會將節點 Node 的狀態設為 cancel,設置為取消的用處在后面的 release 方法中會有體現。
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; failed = false;return true;}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }總結一下過程
release
這個方法首先去調用了我們實現的 tryRelease,當結果返回成功的時候,拿到頭結點,調用 unparkSuccessor 方法來喚醒頭結點的下一個節點。
public final boolean release(long arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; } private void unparkSuccessor(Node node) {int ws = node.waitSatus;// 因為已經獲取過鎖,所以將狀態設設為 0。失敗也沒所謂,說明有其他的線程把它設為0了if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** 一般來說頭結點的下一個節點是在等待著被喚醒的,但是如果是取消的或者意外的是空的,* 則向后遍歷直到找到沒有被取消的節點* */Node s = node.next;// 為空或者大于 0,只有 cancel 狀態是大于 0 的if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); }參考文獻
- 周志明. 深入理解 Java 虛擬機 [M]. 機械工業出版社, 2011.
- 方騰飛.Java 并發編程的藝術 [M]. 機械工業出版社, 2015.
- 深入剖析基于并發AQS的(獨占鎖)重入鎖(ReetrantLock)及其Condition實現原理
- 【JUC】JDK1.8源碼分析之AbstractQueuedSynchronizer(二)
- AbstractQueuedSynchronizer的介紹和原理分析
轉載于:https://www.cnblogs.com/zjmeow/p/9972202.html
總結
以上是生活随笔為你收集整理的深入java并发包源码(三)AQS独占方法源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: swoole 清除定时器提示no tim
- 下一篇: sprint冲刺计划第三天团队任务