日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入理解ReentrantLock

發(fā)布時間:2025/3/21 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入理解ReentrantLock 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在Java中通常實現(xiàn)鎖有兩種方式,一種是synchronized關鍵字,另一種是Lock。二者其實并沒有什么必然聯(lián)系,但是各有各的特點,在使用中可以進行取舍的使用。首先我們先對比下兩者。

實現(xiàn):

首先最大的不同:synchronized是基于JVM層面實現(xiàn)的,而Lock是基于JDK層面實現(xiàn)的。曾經(jīng)反復的找過synchronized的實現(xiàn),可惜最終無果。但Lock卻是基于JDK實現(xiàn)的,我們可以通過閱讀JDK的源碼來理解Lock的實現(xiàn)。

使用:

對于使用者的直觀體驗上Lock是比較復雜的,需要lock和realse,如果忘記釋放鎖就會產(chǎn)生死鎖的問題,所以,通常需要在finally中進行鎖的釋放。但是synchronized的使用十分簡單,只需要對自己的方法或者關注的同步對象或類使用synchronized關鍵字即可。但是對于鎖的粒度控制比較粗,同時對于實現(xiàn)一些鎖的狀態(tài)的轉移比較困難。例如:

特點:

tipssynchronizedLock
鎖獲取超時不支持支持
獲取鎖響應中斷不支持支持

優(yōu)化:

在JDK1.5之后synchronized引入了偏向鎖,輕量級鎖和重量級鎖,從而大大的提高了synchronized的性能,同時對于synchronized的優(yōu)化也在繼續(xù)進行。期待有一天能更簡單的使用java的鎖。

在以前不了解Lock的時候,感覺Lock使用實在是太復雜,但是了解了它的實現(xiàn)之后就被深深吸引了。

Lock的實現(xiàn)主要有ReentrantLock、ReadLock和WriteLock,后兩者接觸的不多,所以簡單分析一下ReentrantLock的實現(xiàn)和運行機制。

ReentrantLock類在java.util.concurrent.locks包中,它的上一級的包java.util.concurrent主要是常用的并發(fā)控制類.

下面是ReentrantLock的UML圖,從圖中可以看出,ReentrantLock實現(xiàn)Lock接口,在ReentrantLock中引用了AbstractQueuedSynchronizer的子類,所有的同步操作都是依靠AbstractQueuedSynchronizer(隊列同步器)實現(xiàn)。

研究一個類,需要從一個類的靜態(tài)域,靜態(tài)類,靜態(tài)方法和成員變量開始。

private static final long serialVersionUID = 7373984872572414699L;/** Synchronizer providing all implementation mechanics */private final Sync sync;/*** Base of synchronization control for this lock. Subclassed* into fair and nonfair versions below. Uses AQS state to* represent the number of holds on the lock.*/abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;/*** Performs {@link Lock#lock}. The main reason for subclassing* is to allow fast path for nonfair version.*/abstract void lock();/*** Performs non-fair tryLock. tryAcquire is* implemented in subclasses, but both need nonfair* try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}final ConditionObject newCondition() {return new ConditionObject();}// Methods relayed from outer classfinal Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}final boolean isLocked() {return getState() != 0;}/*** Reconstitutes this lock instance from a stream.* @param s the stream*/private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();setState(0); // reset to unlocked state}}

從上面的代碼可以看出來首先ReentrantLock是可序列化的,其次是ReentrantLock里有一個對AbstractQueuedSynchronizer的引用。

看完了成員變量和靜態(tài)域,我們需要了解下構造方法:

/*** Creates an instance of {@code ReentrantLock}.* This is equivalent to using {@code ReentrantLock(false)}.*/public ReentrantLock() {sync = new NonfairSync();}/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

從上面代碼可以看出,ReentrantLock支持兩種鎖模式,公平鎖和非公平鎖。默認的實現(xiàn)是非公平的。公平和非公平鎖的實現(xiàn)如下:

/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}

AbstractQueuedSynchronizer 是一個抽象類,所以在使用這個同步器的時候,需要通過自己實現(xiàn)預期的邏輯,Sync、FairSync和NonfairSync都是ReentrantLock為了實現(xiàn)自己的需求而實現(xiàn)的內部類,之所以做成內部類,我認為是只在ReentrantLock使用上述幾個類,在外部沒有使用到。
我們著重關注默認的非公平鎖的實現(xiàn):
在ReentrantLock調用lock()的時候,調用的是下面的代碼:

/*** Acquires the lock.** <p>Acquires the lock if it is not held by another thread and returns* immediately, setting the lock hold count to one.** <p>If the current thread already holds the lock then the hold* count is incremented by one and the method returns immediately.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until the lock has been acquired,* at which time the lock hold count is set to one.*/public void lock() {sync.lock();}

sync的實現(xiàn)是NonfairSync,所以調用的是NonfairSync的lock方法:

/*** Sync object for non-fair locks* tips:調用Lock的時候,嘗試獲取鎖,這里采用的CAS去嘗試獲取鎖,如果獲取鎖成功* 那么,當前線程獲取到鎖,如果失敗,調用acquire處理。* */static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}

接下來看看compareAndSetState方法是怎么進行鎖的獲取操作的:

/*** Atomically sets synchronization state to the given updated* value if the current state value equals the expected value.* This operation has memory semantics of a <tt>volatile</tt> read* and write.** @param expect the expected value* @param update the new value* @return true if successful. False return indicates that the actual* value was not equal to the expected value.* * tips: 1.compareAndSetState的實現(xiàn)主要是通過Unsafe類實現(xiàn)的。* 2.之所以命名為Unsafe,是因為這個類對于JVM來說是不安全的,我們平時也是使用不了這個類的。* 3.Unsafe類內封裝了一些可以直接操作指定內存位置的接口,是不是感覺和C有點像了?* 4.Unsafe類封裝了CAS操作,來達到樂觀的鎖的爭搶的效果*/protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}

主要的說明都在方法的注釋中,接下來簡單的看一下 compareAndSwapInt的實現(xiàn):

/*** Atomically update Java variable to <tt>x</tt> if it is currently* holding <tt>expected</tt>.* @return <tt>true</tt> if successful*/public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);

一個native方法,沮喪.....但是從注釋看意思是,以CAS的方式將制定字段設置為指定的值。同時我們也明白了這個方法可能是用java實現(xiàn)不了,只能依賴JVm底層的C代碼實現(xiàn)。下面看看操作的stateOffset:

private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static {try {//這個方法很有意思,主要的意思是獲取AbstractQueuedSynchronizer的state成員的偏移量//通過這個偏移量來更新state成員,另外state是volatile的來保證可見性。stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}

stateOffset 是AbstractQueuedSynchronizer內部定義的一個狀態(tài)量,AbstractQueuedSynchronizer是線程的競態(tài)條件,所以只要某一個線程CAS改變狀態(tài)成功,同時在沒有釋放的情況下,其他線程必然失敗(對于Unsafe類還不是很熟悉,后面還需要系統(tǒng)的學習)。
對于競爭成功的線程會調用 setExclusiveOwnerThread方法:

/*** The current owner of exclusive mode synchronization.*/private transient Thread exclusiveOwnerThread;/*** Sets the thread that currently owns exclusive access. A* <tt>null</tt> argument indicates that no thread owns access.* This method does not otherwise impose any synchronization or* <tt>volatile</tt> field accesses.*/protected final void setExclusiveOwnerThread(Thread t) {exclusiveOwnerThread = t;}

這個實現(xiàn)是比較簡單的,只是獲取當前線程的引用,令AbstractOwnableSynchronizer中的exclusiveOwnerThread引用到當前線程。競爭失敗的線程,會調用acquire方法,這個方法也是ReentrantLock設計的精華之處:

/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.* tips:此處主要是處理沒有獲取到鎖的線程* tryAcquire:重新進行一次鎖獲取和進行鎖重入的處理。* addWaiter:將線程添加到等待隊列中。* acquireQueued:自旋獲取鎖。 * selfInterrupt:中斷線程。* 三個條件的關系為and,如果 acquireQueued返回true,那么線程被中斷selfInterrupt會中斷線程*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

AbstractQueuedSynchronizer為抽象方法,調用tryAcquire時,調用的為NonfairSync的tryAcquire。

protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);} /*** Performs non-fair tryLock. tryAcquire is* implemented in subclasses, but both need nonfair* try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

nonfairTryAcquire方法主要是做重入鎖的實現(xiàn),synchronized本身支持鎖的重入,而ReentrantLock則是通過此處實現(xiàn)。在鎖狀態(tài)為0時,重新嘗試獲取鎖。如果已經(jīng)被占用,那么做一次是否當前線程為占用鎖的線程的判斷,如果是一樣的那么進行計數(shù),當然在鎖的relase過程中會進行遞減,保證鎖的正常釋放。
如果沒有重新獲取到鎖或者鎖的占用線程和當前線程是一個線程,方法返回false。那么把線程添加到等待隊列中,調用addWaiter:

/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;} /*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

這里主要是用當前線程構建一個Node的等待隊列雙向鏈表,這里addWaiter中和enq中的部分邏輯是重復的,個人感覺可能是如果能一次成功就避免了enq中的死循環(huán)。因為tail節(jié)點是volatile的同時node也是不會發(fā)生競爭的所以node.prev = pred;是安全的。但是tail的next是不斷競爭的,所以利用compareAndSetTail保證操作的串行化。接下來調用acquireQueued方法:

/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

此處是做Node節(jié)點線程的自旋過程,自旋過程主要檢查當前節(jié)點是不是head節(jié)點的next節(jié)點,如果是,則嘗試獲取鎖,如果獲取成功,那么釋放當前節(jié)點,同時返回。至此一個非公平鎖的鎖獲取過程結束。
如果這里一直不斷的循環(huán)檢查,其實是很耗費性能的,JDK的實現(xiàn)肯定不會這么“弱智”,所以有了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,這兩個方法就實現(xiàn)了線程的等待從而避免無限的輪詢:

/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}

首先,檢查一下當前Node的前置節(jié)點pred是否是SIGNAL,如果是SIGNAL,那么證明前置Node的線程已經(jīng)Park了,如果waitStatus>0,那么當前節(jié)點已經(jīng)Concel或者中斷。那么不斷調整當前節(jié)點的前置節(jié)點,將已經(jīng)Concel的和已經(jīng)中斷的線程移除隊列。如果waitStatus<0,那么設置waitStatus為SIGNAL,因為調用shouldParkAfterFailedAcquire的方法為死循環(huán)調用,所以終將返回true。接下來看parkAndCheckInterrupt方法,當shouldParkAfterFailedAcquire返回True的時候執(zhí)行parkAndCheckInterrupt方法:

private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

此方法比較簡單,其實就是使當前的線程park,即暫停了線程的輪詢。當Unlock時會做后續(xù)節(jié)點的Unpark喚醒線程繼續(xù)爭搶鎖。
接下來看一下鎖的釋放過程,鎖釋放主要是通過unlock方法實現(xiàn):

/*** Attempts to release this lock.** <p>If the current thread is the holder of this lock then the hold* count is decremented. If the hold count is now zero then the lock* is released. If the current thread is not the holder of this* lock then {@link IllegalMonitorStateException} is thrown.** @throws IllegalMonitorStateException if the current thread does not* hold this lock*/public void unlock() {sync.release(1);}

主要是調用AbstractQueuedSynchronizer同步器的release方法:

/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

tryRelease方法為ReentrantLock中的Sync的tryRelease方法:

protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}

tryRelease方法主要是做了一個釋放鎖的過程,將同步狀態(tài)state -1,直到減到0為止,這主要是兼容重入鎖設計的,同時setExclusiveOwnerThread(null)清除當前占用的線程。這些head節(jié)點后的線程和新進的線程就可以開始爭搶。這里需要注意的是對于同步隊列中的線程來說在setState(c),且c為0的時候,同步隊列中的線程是沒有競爭鎖的,因為線程被park了還沒有喚醒。但是此時對于新進入的線程是有機會獲取到鎖的。
下面代碼是進行線程的喚醒:

Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;

因為在setState(c)釋放了鎖之后,是沒有線程競爭的,所以head是當前的head節(jié)點,先檢查當前的Node是否合法,如果合法則unpark it。開始鎖的獲取。就回到了上面的for循環(huán)執(zhí)行獲取鎖邏輯:

至此鎖的釋放就結束了,可以看到ReentrantLock是一個不斷的循環(huán)的狀態(tài)模型,里面有很多東西值得我們學習和思考。

ReentrantLock具有公平和非公平兩種模式,也各有優(yōu)缺點:
公平鎖是嚴格的以FIFO的方式進行鎖的競爭,但是非公平鎖是無序的鎖競爭,剛釋放鎖的線程很大程度上能比較快的獲取到鎖,隊列中的線程只能等待,所以非公平鎖可能會有“饑餓”的問題。但是重復的鎖獲取能減小線程之間的切換,而公平鎖則是嚴格的線程切換,這樣對操作系統(tǒng)的影響是比較大的,所以非公平鎖的吞吐量是大于公平鎖的,這也是為什么JDK將非公平鎖作為默認的實現(xiàn)。

最后:

關于并發(fā)和Lock還有很多的點還是比較模糊,我也會繼續(xù)學習,繼續(xù)總結,如果文章中有什么問題,還請各位看客及時指出,共同學習。

from:?https://www.cnblogs.com/zhimingyang/p/5702752.html?

總結

以上是生活随笔為你收集整理的深入理解ReentrantLock的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。