日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

多线程(三)之ReentrantLock源码解析

發布時間:2025/3/19 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 多线程(三)之ReentrantLock源码解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

?

今天分析ReentrantLock類的源碼,在看源碼之前,先學習AQS(AbstractQueuedSynchronizer)類。

一、AQS

什么是AQS?

AQS是一個類,要實現多線程鎖,可以繼承該類,并重寫其中的方法即可。該類也叫同步器。

同步器的主要使用方式是繼承,子類通過繼承同步器并實現它的抽象方法來管理同步狀態(state,標記線程的狀態)。

同步器的設計是基于模板方法模式, 使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義同步組件的實現中,并調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法。

同步器需要重寫的重要方法:

tryAcquire? 獨占鎖獲取

tryRelease?? 獨占鎖釋放

tryAcquireShared? 共享鎖獲取

tryReleaseShared? 共享鎖釋放

isHeldExclusively 快速判斷被線程獨占

對同步狀態進行更改,這時就需要使用同步器提供的3個方法(這三個方法不需要重寫,只要子類進行調用,完成相應的操作)

getState?獲取同步狀態

setState 設置同步狀態

compareAndSetState 原子的設置同步狀態來進行操作。

?

二、結合ReentrantLock類來了解AQS的源碼

ReentrantLock的結構

解釋下,Sync是ReentrantLock類的內部類,且是實現AQS的類。ReentrantLock有兩種鎖,公平鎖和非公平鎖,分別繼承了Sync類。下面以非公平鎖為例,說明下AQS的源碼。(確實AQS的源碼比較難理解,我理解的可能也不是很深,畢竟才看了四五遍,我先把我的理解用圖文并茂的形式展現出來)

ReentrantLock源碼解析

final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}

非公平鎖調用lock方法,是通過同步方法compareAndSetState(最終調用了unsafe類的方法)直接嘗試設置state為1,意思就是如果沒有鎖,這個方法就會返回true,執行setExclusiveOwnerThread方法,將當前線程設置為鎖的持有者。如果之前有鎖,就執行acquire這個方法。(state:0表示沒有鎖,1表示已經有鎖)

假設,這時有個thread1調用非公平鎖的lock方法,因為state的初始狀態位0,很明顯,執行了setExclusiveOwnerThread,將state設置為了1,并將當前線程設置為鎖的持有者。然后又來了一個線程thread2調用lock獲取鎖,設這時候的thread1并沒有釋放鎖。明顯compareAndSetState設置不成功,返回false,然后執行acquire方法。這個方法是AQS類里。

public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

?這個if寫的,比較高大上,好多源碼都是這么寫的。可以看成:

if(!tryAcquire(arg)) {Node node = addWaiter(Node.EXCLUSIVE);if(acquierQueued(node,arg))selfInterrupt(); }

這時候,thread2先執行tryAcquire方法。那tryAcquire在哪呢?上面說了,子類要實現tryAcquire方法。所以執行的是在NonFairSync里面。

protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}

我們現在看下nonfairTryAcquire方法

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//如果沒有鎖,就嘗試獲取鎖if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//判斷當前線程是否為鎖的持有者,支持鎖的重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

thread2調用該方法,這時候state為1(因為thread1在持有鎖),并且thread2<>thread1,所以返回false。

然后再回到acquire方法。

public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

因為tryAcquire返回false,即!tryAcquire為true。這時候thread2執行addWaiter方法。

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {/***和enq中的else中的方法一致,都是將節點插入到同步隊列末尾**/node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}

首先,將thread2封裝成Node類(這個Node類定義在AQS里面,是同步隊列中的節點),因為thread1并沒有創建隊列,所以tail為null,即空的同步隊列。所以thread2執行enq方法。

private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

這是一個自旋循環。。。。

通過上面分析,同步隊列為空,即t=null。所以,thread2新建了一個空的Node節點,該節點是“假”節點,然后通過CAS操作設置了線程的阻塞隊列的head節點就是我們剛才new出來的那個空的node節點,并將tail和head指向這個“假”節點,由此可知,該同步隊列是一個帶頭節點的同步隊列。至此,第一次循環結束,此時同步隊列的狀態為:

然后進行第二循環,t這時候不為空了,所以執行else分支。

一句一句的來:

node.prev = t;將尾部tail節點賦值給我們傳遞進來的節點Node的前驅節點,此時的結構如下:

compareAndSetTail(t, node),通過同步方法,將node設置為尾節點。

t.next=node;將原來的tail節點的后繼指向node節點。

此時跳出自旋,返回當前節點。

public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

在看這個acquire方法,執行完addWaiter方法后,然后thread2執行acquireQueued方法。

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

哎,頭疼,又是一個自旋,不過沒有關系,把同步隊列的結構搞清楚,就很容易分析。

首先,獲取thread2對應的節點的前驅節點,這時,就是head節點,即“假”節點。如果前驅節點是head節點,就調用之前的tryAcquire方法,嘗試獲取鎖。因為thread1沒有釋放鎖,很明顯tryAcquire方法返回false。因此thread2方法執行下面的if方法。看下shouldParkAfterFailedAcquire方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//Node.SIGNAL表示-1if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}

獲取前驅節點的waitStatus值。-1:表示當前節點正處于等待signal喚醒。>0:表示前驅節點應該被移除。其他情況:將調用同步方法將前驅的waitStatus設置為-1

因為head節點的waitStatus=0,所以設置head的waitStatus為-1,然后返回false。此時,結束了acquireQueued的第一次循環。現在執行第二次循環,和第一循環一樣,沒有執行第一個if,繼續執行shouldParkAfterFailedAcquire方法。這時ws不再是0了,而是-1了。因此返回true。然后調用acquireQueued中的parkAndCheckInterrupt方法。

private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

由代碼可以看出,調用了LockSupport的park方法。

這里介紹下LockSuppor類。

LockSupport定義了一組的公共靜態方法,這些方法提供了最基本的線程阻塞和喚醒功能,而LockSupport也成為構建同步組件的基礎工具。LockSupport定義了一組以park開頭的方法用來阻塞當前線程,以及unpark(Thread thread)方法來喚醒一個被阻塞的線程。

由此可知,thread2被阻塞起來,不在執行。。。。

假設這時,thread1還沒有釋放鎖(感覺時間過了半個世紀,其實這些操作,以計算機的速度,我們看了還沒有這些操作差不多),此時thread3線程調用lock方法。因為thread1還有沒有釋放鎖,所以調用AQS的acquire方法。進而調用tryAcquire方法,同樣返回false,所以繼續調用addWaiter方法。

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}

因為此時同步隊列中有thread2方法,所以執行if里面的方法,直接將thread3對應的節點插入到同步隊列的末尾,然后返回。繼續調用acquireQueued方法。

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

進入自旋,第一次循環,很明顯thread3節點的前驅節點是thread2,不是head,所以執行shouldParkAfterFailedAcquire方法。因為thread2結點的waitStatus為0,所以通過同步方法設置waitStatus為-1,然后返回false。此時,第一次循環結束。執行第二次循環,hread3的前繼任然不是thread2,所以繼續執行shouldParkAfterFailedAcquire方法。此時thread2的waitStatus為-1,所以直接返回true。然后thread3執行parkAndCheckInterrupt方法。然后將thread3線程進行阻塞。

此時thread1終于結束了,調用unlock釋放鎖。

public void unlock() {sync.release(1);}

進而調用AQS的release放釋放鎖。

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

首先,調用NonFairSync中的tryRelease方法。

protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}

現將當前的state-1,如果為0,表示沒有線程持有鎖了,將持有鎖的線程設置為?null,設置state為0,放回true。如果state不等0,表示有重入鎖,設置state(不為0),然后返回false。設thread1沒有重入鎖,所以該方法返回true。然后進行if判斷,如果同步隊列有結點,并且頭結點的waitStatus不為0,則調用unparkSuccessor方法。

private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

第一步,將頭結點的waitStatus設為0,然后判斷head的下一個結點,即thead2結點是否為null,如果不為空,thread2的waitStatus值是否>0。很明顯,不執行這個if,那就執行下一個if,將thread2進行喚醒。我們知道,thread2在acquireQueued方法里被阻塞,然后喚醒后,繼續執行。進行第下一次的循環。

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

然后調用第一if,執行if里面的代碼。即將thread2結點設為頭結點,讓原來的head結點從同步隊列中清除。然后返回false。至此,thread2處于運行狀態。。。。

公平鎖和非公平鎖的區別:

在公平鎖中FairSync中,tryAcquire方法:

protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

對比非公平鎖發現,

if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {

這個判斷多了一個!hasQueuedPredecessors方法,判斷同步隊列中是否還有等待的線程,如果沒有,當前線程就獲取state,如果有,就返回false,意味著將當前線程插入到同步隊列末尾。而非公平鎖就不看有沒有隊列,直接嘗試獲取鎖,如果沒有獲取到才插入到隊列的末尾。

可中斷的獲取鎖

當調用lockInterruptibly時:

public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}

然后調用AQS中的acquireInterruuptibly方法。

public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}

首先查看當前線程的中斷標記為,如果為true,就拋異常。

private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}} final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

對比lock獲取鎖的acquireQueued方法和lockInterruptibly獲取鎖的doAcquireInterruptibly方法,發現當線程處于等待鎖的阻塞中,如果被中斷,doAcquireInterruptibly直接拋出異常,而acquireQueued只是將中斷標記位設為true,具體中斷不中斷看線程的心情。。。。。。。

轉載于:https://my.oschina.net/littlestyle/blog/1605294

總結

以上是生活随笔為你收集整理的多线程(三)之ReentrantLock源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。