日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java Review - 并发编程_读写锁ReentrantReadWriteLock的原理源码剖析

發布時間:2025/3/21 java 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java Review - 并发编程_读写锁ReentrantReadWriteLock的原理源码剖析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • ReentrantLock VS ReentrantReadWriteLock
  • 類圖結構
  • 非公平的讀寫鎖實現
    • 寫鎖的獲取與釋放
      • void lock()
      • void lockInterruptibly()
      • boolean tryLock()
      • boolean tryLock(long timeout, TimeUnit unit)
      • void unlock()
    • 讀鎖的獲取與釋放
      • void lock()
      • void lockInterruptibly()
      • boolean tryLock()
      • boolean tryLock(long timeout, TimeUnit unit)
      • void unlock()
  • Demo
  • 小結


ReentrantLock VS ReentrantReadWriteLock

解決線程安全問題使用ReentrantLock就可以,但是ReentrantLock是獨占鎖,某時只有一個線程可以獲取該鎖,而實際中會有寫少讀多的場景,顯然ReentrantLock滿足不了這個需求,所以ReentrantReadWriteLock應運而生。ReentrantReadWriteLock采用讀寫分離的策略,允許多個線程可以同時獲取讀鎖。


類圖結構

為了了解ReentrantReadWriteLock的內部構造,我們先看下它的類圖結構


讀寫鎖的內部維護了一個ReadLock和一個WriteLock,它們依賴Sync實現具體功能。

Sync繼承自AQS,并且也提供了公平和非公平的實現。


非公平的讀寫鎖實現

下面只介紹非公平的讀寫鎖實現。

我們知道AQS中只維護了一個state狀態,而ReentrantReadWriteLock則需要維護讀狀態和寫狀態,一個state怎么表示寫和讀兩種狀態呢?

ReentrantReadWriteLock巧妙地使用state的高16位表示讀狀態,也就是獲取到讀鎖的次數;使用低16位表示獲取到寫鎖的線程的可重入次數。

/** Read vs write count extraction constants and functions.* Lock state is logically divided into two unsigned shorts:* The lower one representing the exclusive (writer) lock hold count,* and the upper the shared (reader) hold count.*/static final int SHARED_SHIFT = 16;//共享鎖(讀鎖)狀態單位值65536static final int SHARED_UNIT = (1 << SHARED_SHIFT);//共享鎖線程最大個數65535static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//排它鎖(寫鎖)掩碼,二進制,15個1static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 返回讀鎖線程數/** Returns the number of shared holds represented in count */static int sharedCount(int c) { return c >>> SHARED_SHIFT; }// 返回寫鎖線程數/** Returns the number of exclusive holds represented in count */static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

Sync中的

  • firstReader用來記錄第一個獲取到讀鎖的線程
  • firstReaderHoldCount則記錄第一個獲取到讀鎖的線程獲取讀鎖的可重入次數。
  • cachedHoldCounter用來記錄最后一個獲取讀鎖的線程獲取讀鎖的可重入次數
/*** A counter for per-thread read hold counts.* Maintained as a ThreadLocal; cached in cachedHoldCounter*/static final class HoldCounter {int count = 0;// Use id, not reference, to avoid garbage retentionfinal long tid = getThreadId(Thread.currentThread());}

readHolds是ThreadLocal變量,用來存放除去第一個獲取讀鎖線程外的其他線程獲取讀鎖的可重入次數。

ThreadLocalHoldCounter繼承了ThreadLocal,因而initialValue方法返回一個HoldCounter對象。

/*** ThreadLocal subclass. Easiest to explicitly define for sake* of deserialization mechanics.*/static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}


寫鎖的獲取與釋放

在ReentrantReadWriteLock中寫鎖使用WriteLock來實現。

void lock()

寫鎖是個獨占鎖,某時只有一個線程可以獲取該鎖。

  • 如果當前沒有線程獲取到讀鎖和寫鎖,則當前線程可以獲取到寫鎖然后返回。
  • 如果當前已經有線程獲取到讀鎖和寫鎖,則當前請求寫鎖的線程會被阻塞掛起。
  • 另外,寫鎖是可重入鎖,如果當前線程已經獲取了該鎖,再次獲取只是簡單地把可重入次數加1后直接返回。

public void lock() {sync.acquire(1);} public final void acquire(int arg) {// sync重寫的tryAcquire方法if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

如以上代碼所示,在 lock()內部調用了AQS的acquire方法,其中tryAcquire是ReentrantReadWriteLock內部的sync類重寫的,代碼如下。

protected final boolean tryAcquire(int acquires) {/** Walkthrough:* 1. If read count nonzero or write count nonzero* and owner is a different thread, fail.* 2. If count would saturate, fail. (This can only* happen if count is already nonzero.)* 3. Otherwise, this thread is eligible for lock if* it is either a reentrant acquire or* queue policy allows it. If so, update state* and set owner.*/Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);// 1 c!=0 說明讀鎖或者寫鎖已經被其他線程獲取 if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)// 2 w = 0 說明有線程已經獲取了寫鎖,w!=0 且 當前線程不是寫鎖的擁有者,返回falseif (w == 0 || current != getExclusiveOwnerThread())return false;// 3 說明當前線程已經獲取到了寫鎖,判斷可重入次數if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquire// 設置可重入次數(1 )setState(c + acquires);return true;}// 5 第一個寫線程獲取寫鎖 if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;}
  • 代碼(1)中,如果當前AQS狀態值不為0則說明當前已經有線程獲取到了讀鎖或者寫鎖。

  • 代碼(2)中,如果w==0說明狀態值的低16位為0,而AQS狀態值不為0,則說明高16位不為0,這暗示已經有線程獲取了讀鎖,所以直接返回false 。 如果w!=0則說明當前已經有線程獲取了該寫鎖,再看當前線程是不是該鎖的持有者,如果不是則返回false。

  • 執行到代碼(3)說明當前線程之前已經獲取到了該鎖,所以判斷該線程的可重入次數是不是超過了最大值,是則拋出異常,否則執行代碼 (4)增加當前線程的可重入次數,然后返回true.

  • 如果AQS的狀態值等于0則說明目前沒有線程獲取到讀鎖和寫鎖,所以執行代碼(5)。其中,對于writerShouldBlock方法,

非公平鎖的實現為

final boolean writerShouldBlock() {return false; // writers can always barge}

如果代碼對于非公平鎖來說總是返回false,則說明代碼(5)搶占式執行CAS嘗試獲取寫鎖,獲取成功則設置當前鎖的持有者為當前線程并返回true,否則返回false。

公平鎖的實現為

final boolean writerShouldBlock() {return hasQueuedPredecessors();}

這里還是使用hasQueuedPredecessors來判斷當前線程節點是否有前驅節點,如果有則當前線程放棄獲取寫鎖的權限,直接返回false。


void lockInterruptibly()

類似于lock()方法,它的不同之處在于,它會對中斷進行響應,也就是當其他線程調用了該線程的interrupt()方法中斷了當前線程時,當前線程會拋出異常InterruptedException異常。

public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}

boolean tryLock()

嘗試獲取寫鎖,如果當前沒有其他線程持有寫鎖或者讀鎖,則當前線程獲取寫鎖會成功,然后返回true。 如果當前已經有其他線程持有寫鎖或者讀鎖則該方法直接返回false,且當前線程并不會被阻塞。 如果當前線程已經持有了該寫鎖則簡單增加AQS的狀態值后直接返回true。

public boolean tryLock( ) {return sync.tryWriteLock();} /*** Performs tryLock for write, enabling barging in both modes.* This is identical in effect to tryAcquire except for lack* of calls to writerShouldBlock.*/final boolean tryWriteLock() {Thread current = Thread.currentThread();int c = getState();if (c != 0) {int w = exclusiveCount(c);if (w == 0 || current != getExclusiveOwnerThread())return false;if (w == MAX_COUNT)throw new Error("Maximum lock count exceeded");}if (!compareAndSetState(c, c + 1))return false;setExclusiveOwnerThread(current);return true;}

如上代碼與tryAcquire方法類似,不同在于這里使用的是非公平策略。


boolean tryLock(long timeout, TimeUnit unit)

與tryAcquire()的不同之處在于,多了超時時間參數,如果嘗試獲取寫鎖失敗則會把當前線程掛起指定時間,待超時時間到后當前線程被激活,如果還是沒有獲取到寫鎖則返回false。另外,該方法會對中斷進行響應,也就是當其他線程調用了該線程的interrupt()方法中斷了當前線程時,當前線程會拋出InterruptedException異常。

public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}

void unlock()

嘗試釋放鎖,如果當前線程持有該鎖,調用該方法會讓該線程對該線程持有的AQS狀態值減1,如果減去1后當前狀態值為0則當前線程會釋放該鎖,否則僅僅減1而已。如果當前線程沒有持有該鎖而調用了該方法則會拋出IllegalMonitorStateException異常

public void unlock() {sync.release(1);} public final boolean release(int arg) {// 調用ReentrantReadWriteLock#Sync重寫的tryReleaseif (tryRelease(arg)) {// 激活阻塞隊列里面的一個線程Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;} protected final boolean tryRelease(int releases) {// 6 看是否是擁有寫鎖的線程調用的unLockif (!isHeldExclusively())throw new IllegalMonitorStateException();// 7 獲取可重入值,沒有考慮高16位,因為獲取寫鎖時讀鎖的值肯定為0int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;// 如果可重入鎖值為0則釋放鎖,否則只是簡單的更新狀態值 if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}
  • tryRelease首先通過isHeldExclusively判斷是否當前線程是該寫鎖的持有者,如果不是則拋出異常

  • 否則執行代碼(7),這說明當前線程持有寫鎖,持有寫鎖說明狀態值的高16位為0,所以這里nextc值就是當前線程寫鎖的剩余可重入次數。

  • 代碼(8)判斷當前可重入次數是否為0,如果free為true則說明可重入次數為0,所以當前線程會釋放寫鎖,將當前鎖的持有者設置為null。如果free為false則簡單地更新可重入次數。

讀鎖的獲取與釋放

ReentrantReadWriteLock中的讀鎖是使用ReadLock來實現的。

void lock()

獲取讀鎖,如果當前沒有其他線程持有寫鎖,則當前線程可以獲取讀鎖,AQS的狀態值state的高16位的值會增加1,然后方法返回。否則如果其他一個線程持有寫鎖,則當前線程會被阻塞。

/*** Acquires the read lock.** <p>Acquires the read lock if the write lock is not held by* another thread and returns immediately.** <p>If the write lock is held by another thread then* the current thread becomes disabled for thread scheduling* purposes and lies dormant until the read lock has been acquired.*/public void lock() {sync.acquireShared(1);} public final void acquireShared(int arg) {// 調用ReentrantReadWriteLock中syn的tryAcquireSharedif (tryAcquireShared(arg) < 0)// 調用AQS的doAcquireShareddoAcquireShared(arg);} protected final int tryAcquireShared(int unused) {/** Walkthrough:* 1. If write lock held by another thread, fail.* 2. Otherwise, this thread is eligible for* lock wrt state, so ask if it should block* because of queue policy. If not, try* to grant by CASing state and updating count.* Note that step does not check for reentrant* acquires, which is postponed to full version* to avoid having to check hold count in* the more typical non-reentrant case.* 3. If step 2 fails either because thread* apparently not eligible or CAS fails or count* saturated, chain to version with full retry loop.*/Thread current = Thread.currentThread();// 1 獲取當前狀態值int c = getState();// 2 判斷是否寫鎖被占用if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 3 獲取讀鎖計數int r = sharedCount(c);// 4 嘗試獲取鎖,多個讀線程只有一個會成功,不成功的進入fullTryAcquireShared進行重試if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 5 第一個線程獲取到鎖if (r == 0) {firstReader = current;firstReaderHoldCount = 1;// 6 如果當前線程是第一個獲取讀鎖的線程} else if (firstReader == current) {firstReaderHoldCount++;} else {// 7 記錄最后一個獲取讀鎖的線程或記錄其他線程讀鎖的可重入次數HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}// 8 類似tryAcquireShared,但是自旋獲取return fullTryAcquireShared(current);}
  • 首先獲取了當前AQS的狀態值
  • 然后代碼(2)查看是否有其他線程獲取到了寫鎖,如果是則直接返回-1,而后調用AQS的doAcquireShared方法把當前線程放入AQS阻塞隊列。

如果當前要獲取讀鎖的線程已經持有了寫鎖,則也可以獲取讀鎖。但是需要注意,當一個線程先獲取了寫鎖,然后獲取了讀鎖處理事情完畢后,要記得把讀鎖和寫鎖都釋放掉,不能只釋放寫鎖。

  • 否則執行代碼(3),得到獲取到的讀鎖的個數,到這里說明目前沒有線程獲取到寫鎖,但是可能有線程持有讀鎖,然后執行代碼(4)

其中非公平鎖的readerShouldBlock實現代碼如下

final boolean readerShouldBlock() {/* As a heuristic to avoid indefinite writer starvation,* block if the thread that momentarily appears to be head* of queue, if one exists, is a waiting writer. This is* only a probabilistic effect since a new reader will not* block if there is a waiting writer behind other enabled* readers that have not yet drained from the queue.*/return apparentlyFirstQueuedIsExclusive();}} final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null &&(s = h.next) != null &&!s.isShared() &&s.thread != null;}

如上代碼的作用是,如果隊列里面存在一個元素,則判斷第一個元素是不是正在嘗試獲取寫鎖,如果不是,則當前線程判斷當前獲取讀鎖的線程是否達到了最大值。最后執行CAS操作將AQS狀態值的高16位值增加1。

  • 代碼(5)(6)記錄第一個獲取讀鎖的線程并統計該線程獲取讀鎖的可重入數。

  • 代碼(7)使用cachedHoldCounter記錄最后一個獲取到讀鎖的線程和該線程獲取讀鎖的可重入數,readHolds記錄了當前線程獲取讀鎖的可重入數。

  • 如果readerShouldBlock返回true則說明有線程正在獲取寫鎖,所以執行代碼(8)。

  • fullTryAcquireShared的代碼與tryAcquireShared類似,它們的不同之處在于,前者通過循環自旋獲取。

final int fullTryAcquireShared(Thread current) {/** This code is in part redundant with that in* tryAcquireShared but is simpler overall by not* complicating tryAcquireShared with interactions between* retries and lazily reading hold counts.*/HoldCounter rh = null;for (;;) {int c = getState();if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) {// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}


void lockInterruptibly()

類似于lock()方法,不同之處在于,該方法會對中斷進行響應,也就是當其他線程調用了該線程的interrupt()方法中斷了當前線程時,當前線程會拋出InterruptedException異常。


boolean tryLock()

  • 嘗試獲取讀鎖,如果當前沒有其他線程持有寫鎖,則當前線程獲取讀鎖會成功,然后返回true 。

  • 如果當前已經有其他線程持有寫鎖則該方法直接返回false,但當前線程并不會被阻塞。

  • 如果當前線程已經持有了該讀鎖則簡單增加AQS的狀態值高16位后直接返回true。

其代碼類似tryLock的代碼,這里不再講述。


boolean tryLock(long timeout, TimeUnit unit)

與tryLock()的不同之處在于,多了超時時間參數,如果嘗試獲取讀鎖失敗則會把當前線程掛起指定時間,待超時時間到后當前線程被激活,如果此時還沒有獲取到讀鎖則返回false。

另外,該方法對中斷響應,也就是當其他線程調用了該線程的interrupt()方法中斷了當前線程時,當前線程會拋出InterruptedException異常。


void unlock()

public void unlock() {sync.releaseShared(1);}

如上代碼具體釋放鎖的操作是委托給Sync類來做的,sync.releaseShared方法的代碼如下:

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;} protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}// 循環直到自己的讀計數器為-1 ,CAS更新成功 for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}

如以上代碼所示,在無限循環里面,首先獲取當前AQS狀態值并將其保存到變量c,然后變量c被減去一個讀計數單位后使用CAS操作更新AQS狀態值,如果更新成功則查看當前AQS狀態值是否為0,為0則說明當前已經沒有讀線程占用讀鎖,則tryReleaseShared返回true。

然后會調用doReleaseShared方法釋放一個由于獲取寫鎖而被阻塞的線程,如果當前AQS狀態值不為0,則說明當前還有其他線程持有了讀鎖,所以tryReleaseShared返回false。

如果tryReleaseShared中的CAS更新AQS狀態值失敗,則自旋重試直到成功。


Demo

Java Review - 并發編程_獨占鎖ReentrantLock原理&源碼剖析
介紹了如何使用ReentrantLock實現線程安全的list,但是由于ReentrantLock是獨占鎖,所以在讀多寫少的情況下性能很差。下面使用ReentrantReadWriteLock來改造它,代碼如下

import java.util.ArrayList; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/4 23:05* @mark: show me the code , change the world*/ public class ReentrantReadWriteLockList {//線程不安全的Listprivate ArrayList<String> list = new ArrayList<String>();//獨占鎖private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private final Lock readLock = lock.readLock();private final Lock writeLock = lock.writeLock();//往集合中添加元素public void add(String str) {writeLock.lock();try {list.add(str);} finally {writeLock.unlock();}}//刪除集合中的元素public void remove(String str) {writeLock.lock();try {list.remove(str);} finally {writeLock.unlock();}}//根據索引獲取集合中某個元素public String get(int index) {readLock.lock();try {return list.get(index);} finally {readLock.unlock();}} }

以上代碼調用get方法時使用的是讀鎖,這樣運行多個讀線程來同時訪問list的元素,這在讀多寫少的情況下性能會更好。

小結

我們這里介紹了讀寫鎖ReentrantReadWriteLock的原理,它的底層是使用AQS實現的。ReentrantReadWriteLock巧妙地使用AQS的狀態值的高16位表示獲取到讀鎖的個數,低16位表示獲取寫鎖的線程的可重入次數,并通過CAS對其進行操作實現了讀寫分離,這在讀多寫少的場景下比較適用。

總結

以上是生活随笔為你收集整理的Java Review - 并发编程_读写锁ReentrantReadWriteLock的原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。