Java中锁的使用和实现
首先,我們要了解一個概念,JAVA中的鎖到底是什么呢?
鎖是用來控制多個線程訪問共享資源的方式,一般來說,一個鎖能夠防止多個線程同時訪問共享資源。
Lock接口
在Java SE 5之后,并發包中新增了Lock接口(以及相關實現類)用來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能,只是在使用時需要 顯式 地獲取和釋放鎖。
雖然它缺少了(通過synchronized塊或者方法所提供的)隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取與釋放的 可操作性、可中斷的獲取鎖 以及 超時獲取鎖 等多種synchronized關鍵字所不具備的同步特性。
使用synchronized關鍵字將會隱式地獲取鎖,但是它將鎖的獲取和釋放固化了,也就是先獲取再釋放。
Lock使用示例
Lock lock = new ReentrantLock(); lock.lock(); try { } finally {lock.unlock(); }注意
1.在finally塊中釋放鎖,目的是保證在獲取到鎖之后,最終能夠被釋放。2.不要將獲取鎖的過程寫在try塊中,因為如果在獲取鎖(自定義鎖的實現)時發生了異常,異常拋出的同時,也會導致鎖無故釋放。Lock接口提供的synchronized關鍵字所不具備的主要特性:
嘗試非阻塞性獲取鎖: 當前線程嘗試獲取鎖,如果此時沒有其他線程占用此鎖,則成功獲取到鎖。
能被中斷的獲取鎖: 當獲取到鎖的線程被中斷時,中斷異常會拋出并且會釋放鎖。
超時獲取鎖: 在指定時間內獲取鎖,如果超過時間還沒獲取,則返回。
Lock 相關的API:
獲取鎖,獲取之后返回:
void lock();可中斷的獲取鎖:
void lockInterruptibly() throws InterruptedException;嘗試非阻塞的獲取鎖:
boolean tryLock();超時獲取鎖。 超時時間結束,未獲得鎖,返回false:
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;釋放鎖:
void unlock();獲取等待通知組件,改組件和鎖綁定,當前線程獲取到鎖才能調用wait()方法,調用之后則會釋放鎖:
Condition newCondition();隊列同步器
隊列同步器AbstractQueuedSynchronizer,是用來構建鎖或者其他同步組件的基礎框架,它使用了一個int 成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作,并發包的作者Doug Lea期望它能夠成為實現大部分同步需求的基礎。
同步器的主要使用方式是繼承AbstractQueuedSynchronizer,通過同步器提供的3個方法getState()、setState(int newState)和compareAndSetState(int expect,int update)來進行線程安全的狀態同步。
同步器是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。
可以這樣理解二者之間的關系:
隊列同步器的接口與示例
可重寫的方法:
AbstractQueuedSynchronizer獨占式獲取同步狀態:
boolean tryAcquire(int arg)獨占式釋放同步狀態:
boolean tryRelease(int arg)共享式獲取同步狀態:
int tryAcquireShared(int arg)共享釋放取同步狀態:
boolean tryReleaseShared(int arg)當前同步器是否在獨占式模式下被線程占用:
boolean isHeldExclusively()實現自定義同步組件時,將會調用同步器提供 獨占式獲取與釋放同步狀態、共享式獲取與釋放同步狀態 和 查詢同步隊列中的等待線程情況 三類模板方法。
獨占鎖示例:
/*** @author zsh*/ public class TestLock implements Lock {private TestQueuedSync tqs;/*** 獲取鎖*/@Overridepublic void lock() {tqs.acquire(1);}/*** 可中斷的獲取鎖*/@Overridepublic void lockInterruptibly() throws InterruptedException {tqs.acquireInterruptibly(1);}/*** 嘗試非阻塞式獲取鎖*/@Overridepublic boolean tryLock() {return tqs.tryAcquire(1);}/*** 嘗試非阻塞式獲取鎖*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return tqs.tryAcquire(1);}/*** 釋放鎖*/@Overridepublic void unlock() {tqs.release(1);}@Overridepublic Condition newCondition() {return tqs.newCondition();}/*** 是否有同步隊列線程*/public boolean hasQueuedThreads() {return tqs.hasQueuedThreads();}/*** 鎖是否被占用*/public boolean isLock() {return tqs.isHeldExclusively();}private static class TestQueuedSync extends AbstractQueuedSynchronizer {/*** 獨占式獲取同步狀態*/@Overrideprotected boolean tryAcquire(int arg) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}/*** 獨占式釋放同步狀態*/@Overrideprotected boolean tryRelease(int arg) {if (getState() == 0) {throw new IllegalStateException();}setExclusiveOwnerThread(null);setState(0);return true;}/*** 同步狀態是否被占用*/@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}/*** 返回一個Condition,每個condition都包含了一個condition隊列*/Condition newCondition() {return new ConditionObject();}} }上述示例代碼中,獨占鎖TestLock是一個自定義同步組件,它在同一時刻只允許一個線程占有鎖。TestLock中定義了一個靜態內部類TestQueuedSync繼承了同步器,在tryAcquire(int acquires)方法中,如果經過compareAndSetState設置成功,則代表獲取了同步狀態1,而在tryRelease(int releases)方法中只是將同步狀態重置為0。
用戶使用TestLock時并不會直接和內部同步器的實現TestQueuedSync打交道,而是調用TestLock提供的方法,在TestLock的實現中,以獲取鎖的lock()方法為例,只需要在方法實現中調用同步器的模板方法acquire(int args)即可,當前線程調用該方法獲取同步狀態失敗后會被加入到同步隊列中等待,這樣就大大降低了實現一個可靠自定義同步組件的門檻。
隊列同步器的實現分析
接下來我們將從實現角度分析同步器是如何完成線程同步的:
同步隊列 : 一個FIFO雙向隊列。
當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構造成為一個節點Node并將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。
Node 保存 獲取同步狀態失敗的線程引用、等待狀態 以及 前驅和后繼節點,節點的屬性類型 與 名稱 以及 描述 如下:
/*** 等待狀態:* CANCELLED : 1 在同步隊列中等待超時或被中斷,需要從隊列中取消等待,在該狀態將不會變化* SIGNAL : -1 后繼節點地線程處于等待狀態,當前節點釋放獲取取消同步狀態,后繼節點地線程即開始運行* CONDITION : -2 在等待隊列中,* PROPAGATE : -3 下一次共享式同步狀態獲取將會無條件地被傳播下去* INITAL : 0 初始狀態*/ volatile int waitStatus; volatile Node prev;//前驅節點 volatile Node next;//后繼節點 volatile Thread thread;//獲取同步狀態的線程 Node nextWaiter;//等待隊列中的后繼節點。 如果節點是共享的的,這個字段將是一個SHARED常量
如上圖所示,同步器包含了兩個節點類型的引用,一個指向 頭節點,而另一個指向 尾節點。
試想一下,當一個線程成功地獲取了同步狀態(或者鎖),其他線程將無法獲取到同步狀態,轉而被構造成為節點并加入到同步隊列中,而這個加入隊列的過程必須要保證線程安全,因此同步器提供了一個基于CAS的設置尾節點的方法:compareAndSetTail(Node expect,Node update),它需要傳遞當前線程“認為”的尾節點和當前節點,只有設置成功后,當前節點才正式與之前的尾節點建立關聯。
獨占式同步狀態獲取與釋放
通過調用同步器的acquire(int arg)方法可以獲取同步狀態,該方法對中斷不敏感,也就是由于線程獲取同步狀態失敗后進入同步隊列中,后續對線程進行中斷操作時,線程不會從同步隊列中移出。
acquire(int arg)示例
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }上述代碼主要完成了 同步狀態獲取、節點構造、加入同步隊列 以及 在同步隊列中自旋等待。
首先調用自定義同步器實現的tryAcquire(int arg)方法保證線程安全的獲取同步狀態如果獲取同步狀態失敗,構造同步節點(獨占式Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)并通過addWaiter(Node node)方法將該節點加入到同步隊列的尾部。最后調用acquireQueued(Node node,int arg)方法,使得該節點以“死循環”的方式獲取同步狀態。如果獲取不到則阻塞節點中的線程,而 被阻塞線程的喚醒 主要依靠 前驅節點的出隊或 阻塞線程被中斷 來實現。我們來看下節點的構造以及加入同步隊列的addWaiter(Node mode)和initializeSyncQueue()方法。
示例
private Node addWaiter(Node mode) {Node node = new Node(mode);for (;;) {Node oldTail = tail;if (oldTail != null) {U.putObject(node, Node.PREV, oldTail);if (compareAndSetTail(oldTail, node)) {oldTail.next = node;return node;}} else {initializeSyncQueue();}} } private final void initializeSyncQueue() {Node n;if (U.compareAndSwapObject(this, HEAD, null, (n = new Node())))tail = n; }上述代碼通過在“死循環”中使用compareAndSetTail(Node expect,Node update)方法來確保節點能夠被線程安全添加。 如果沒有尾節點的話,則構建一個新的同步隊列。
接下來看下acquireQueued(final Node node, int arg)方法。
示例
final boolean acquireQueued(final Node node, int arg) {try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} catch (Throwable t) {cancelAcquire(node);throw t;} }在acquireQueued(final Node node,int arg)方法中,當前線程在“死循環”中嘗試獲取同步狀態,而只有前驅節點是頭節點才能夠嘗試獲取同步狀態,這是為什么?原因有兩個:
頭節點是成功獲取到同步狀態的節點,而頭節點的線程釋放了同步狀態之后,將會喚醒其后繼節點,后繼節點的線程被喚醒后需要檢查自己的前驅節點是否是頭節點。維護同步隊列的FIFO原則。
由于非首節點線程前驅節點出隊或者被中斷而從等待狀態返回,隨后檢查自己的前驅是否是頭節點,如果是則嘗試獲取同步狀態。可以看到節點和節點之間在循環檢查的過程中基本不相互通信,而是簡單地判斷自己的前驅是否為頭節點,這樣就使得節點的釋放規則符合FIFO,并且也便于對過早通知的處理(過早通知是指前驅節點不是頭節點的線程由于中斷而被喚醒)。
調用同步器的release(int arg)方法可以釋放同步狀態,然后會喚醒其后繼節點(進而使后繼節點重新嘗試獲取同步狀態)。release(int arg)執行之后會喚醒后繼的節點。
共享式同步狀態獲取與釋放
共享式獲取 與 獨占式獲取 最主要的區別在于 同一時刻能否有多個線程同時獲取到同步狀態。
以文件的讀寫為例,寫操作 要求對資源的 獨占式訪問 ,而 讀操作 可以是 共享式訪問。
通過調用acquireShared(int arg)可以共享式地獲取同步狀態。
示例
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg); } private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} catch (Throwable t) {cancelAcquire(node);throw t;} }在共享式獲取的自旋過程中,成功獲取到同步狀態并退出自旋的條件就是tryAcquireShared(int arg)方法返回值 大于等于0。
在doAcquireShared(int arg)方法的自旋過程中,如果當前節點的 前驅為頭節點 時,嘗試獲取同步狀態,如果返回值 大于等于0,表示該次獲取同步狀態成功并從自旋過程中退出。
共享式獲取也需要釋放同步狀態,通過調用releaseShared(int arg)方法可以釋放同步狀態:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; } private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue; unparkSuccessor(h);}else if (ws == 0 &&!h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; }if (h == head) break;} }該方法在釋放同步狀態之后,將會喚醒后續處于等待狀態的節點。對于能夠支持多個線程同時訪問的并發組件(比如Semaphore),它和獨占式主要區別在于tryReleaseShared(int arg)方法必須確保同步狀態(或者資源數)線程安全釋放,一般是通過循環和CAS來保證的,因為釋放同步狀態的操作會同時來自多個線程。
超時獲取同步狀態
通過調用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超時獲取同步狀態,即在指定的時間段內獲取同步狀態,如果獲取到同步狀態則返回true,否則,返回false。該方法提供了傳統Java同步操作(比如synchronized關鍵字)所不具備的特性。
示例
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCreturn true;}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L) {cancelAcquire(node);return false;}if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} catch (Throwable t) {cancelAcquire(node);throw t;} }該方法在自旋過程中,當節點的前驅節點為頭節點時嘗試獲取同步狀態,如果獲取成功則從該方法返回,這個過程和獨占式同步獲取的過程類似。獲取失敗則會重新計算超時時間。
如果nanosTimeout小于等于spinForTimeoutThreshold(1000納秒)時,將不會使該線程進行超時等待,而是進入快速的自旋過程。原因在于,非常短的超時等待無法做到十分精確。
獨占式超時獲取同步狀態的流程圖
自定義同步組件
設計一個同步工具:
在同一時刻,只允許至多兩個線程同時訪問,超過兩個線程的訪問將被阻塞。能夠在同一時刻支持多個線程的訪問(共享式訪問)。示例
/*** 自定義同步組件* <p>* 實現以下功能* 1.在同一時刻,只允許至多兩個線程同時訪問,超過兩個線程的訪問將被阻塞。* 2.能夠在同一時刻支持多個線程的訪問(*/ public class CustomLock implements Lock {private CustomSyncQueue customSyncQueue = new CustomSyncQueue(2);public static void main(String[] args) {final Lock lock = new CustomLock();class Worker extends Thread {@Overridepublic void run() {while (true) {lock.lock();try {SleepUtils.second(1);System.out.println(Thread.currentThread().getName());SleepUtils.second(1);} finally {lock.unlock();}}}}// 啟動10個線程for (int i = 0; i < 10; i++) {Worker w = new Worker();w.setDaemon(true);w.start();}// 每隔1秒換行for (int i = 0; i < 10; i++) {SleepUtils.second(1);System.out.println();}}@Overridepublic void lock() {customSyncQueue.tryAcquireShared(1);}@Overridepublic void unlock() {customSyncQueue.tryReleaseShared(1);}public static class CustomSyncQueue extends AbstractQueuedSynchronizer {public CustomSyncQueue(int count) {if (count <= 0) {throw new IllegalStateException("count must >= 0");}setState(count);}@Overrideprotected int tryAcquireShared(int reduceCount) {for (; ; ) {int current = getState();int newCount = current - reduceCount;if (newCount < 0 || compareAndSetState(current, newCount)) {return newCount;}}}@Overrideprotected boolean tryReleaseShared(int returnCount) {for (; ; ) {int current = getState();int newCount = current + returnCount;if (compareAndSetState(current, newCount)) {return true;}}}} }上述代碼主要還是 CustomSyncQueue 的 tryAcquireShared 和 tryReleaseShared 方法,當tryAcquireShared(int reduceCount)方法返回值>=0時,當前線程才獲取同步狀態。
重入鎖
重入鎖ReentrantLock,就是支持重進入的鎖,它表示該鎖能夠支持 一個線程對資源的重復加鎖。
除此之外,該鎖的還支持獲取鎖時的公平和非公平性選擇。
我們回顧下TestLock的lock方法,在 tryAcquire(int acquires)方法時沒有考慮占有鎖的線程再次獲取鎖的場景,而在調用tryAcquire(int acquires)方法時返回了false,導致該線程被阻塞。
在絕對時間上,先對鎖進行獲取的請求一定先被滿足,那么這個鎖是公平的,反之,是不公平的。
事實上,公平的鎖機制往往沒有非公平的效率高,但是,并不是任何場景都是以TPS作為唯一的指標,公平鎖能夠減少“饑餓”發生的概率,等待越久的請求越是能夠得到優先滿足。
下面我們來分析下ReentrantLock 的實現:
實現重進入
重進入是指任意線程在獲取到鎖之后能夠再次獲取該鎖而不會被鎖所阻塞,該特性的實現需要解決以下兩個問題:
線程再次獲取鎖鎖的最終釋放下面是ReentrantLock通過組合自定義同步器來實現鎖的獲取與釋放
示例
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; }此方法通過判斷 當前線程是否為獲取鎖的線程 來決定獲取操作是否成功,如果是獲取鎖的線程再次請求,則將同步狀態值進行增加并返回true,表示獲取同步狀態成功。
下面看釋放鎖的方法tryRelease(int releases)
示例
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free; }通過檢查 state == 0 來判斷是否需要繼續釋放鎖。
公平與非公平獲取鎖的區別
公平性與否是針對獲取鎖而言的,如果一個鎖是公平的,那么鎖的獲取順序就應該符合請求的絕對時間順序,也就是 FIFO。
對于上面介紹的非公平鎖實現的nonfairTryAcquire(int acquires),只要 CAS 設置同步狀態成功,即獲取到鎖,而公平鎖則不同。
示例
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; }相比非公平鎖的實現,公平鎖的實現在獲取鎖的時候多了一個!hasQueuedPredecessors()判斷:
示例
public final boolean hasQueuedPredecessors() {Node t = tail; Node h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread()); }即加入了 同步隊列中當前節點是否有前驅節點的判斷 ,如果該方法返回 true,則表示有線程比當前線程更早地請求獲取鎖,因此 需要等待前驅線程獲取并釋放鎖之后才能繼續獲取鎖。
讀寫鎖
之前提到鎖(如TestLock和ReentrantLock)基本都是排他鎖,這些鎖在同一時刻只允許一個線程進行訪問,而讀寫鎖在同一時刻可以允許多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他寫線程均被阻塞。
讀寫鎖維護了一對鎖,一個 讀鎖 和一個 寫鎖,通過分離讀鎖和寫鎖,使得并發性相比一般的排他鎖有了很大提升。
一般情況下,讀寫鎖 的性能都會比 排它鎖 好,因為大多數場景 讀是多于寫 的。在讀多于寫的情況下,讀寫鎖 能夠提供比 排它鎖 更好的 并發性 和 吞吐量。Java并發包提供讀寫鎖的實現是ReentrantReadWriteLock ,特性如下:
公平性選擇 :支持公平和非公平的方式獲取鎖,吞吐量非公平優于公平。重進入 : 讀鎖在獲取鎖之后再獲取讀鎖,寫鎖在獲取鎖之后再獲取讀鎖和寫鎖。鎖降級 :遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖能夠降級為讀鎖。讀寫鎖的接口與示例
ReadWriteLock僅定義了獲取讀鎖和寫鎖的兩個方法,即readLock()方法和writeLock()方法,而其實現類ReentrantReadWriteLock,除了接口方法之外,還提供了一些便于外界監控其內部工作狀態的方法,這些方法如下:
返回當前讀鎖獲取的次數:
getReadLockCount()返回當前線程獲取讀鎖的次數:
getReadHoldCount()判斷寫鎖是否被獲取:
isWriteLocked()返回當前寫鎖被獲取的次數
getWriteHoldCount()讀寫鎖使用示例代碼:
通過讀寫鎖保證 非線程安全的HashMap的讀寫是線程安全的。
讀寫鎖的實現分析
主要包括:讀寫狀態的設計、寫鎖的獲取與釋放、讀鎖的獲取與釋放以及鎖降級。
讀寫狀態的設計
讀寫鎖將變量切分成了兩個部分,高16位表示讀,低16位表示寫,如下圖:
當前同步狀態表示一個線程已經獲取了寫鎖,且重進入了兩次,同時也連續獲取了兩次讀鎖。
讀寫鎖是如何迅速確定讀和寫各自的狀態呢?答案是通過位運算。
假設當前同步狀態值為S,寫狀態等于S&0x0000FFFF(將高16位全部抹去),讀狀態等于S>>16(無符號補0右移16位)。
當寫狀態增加1時,等于S+1,當讀狀態增加1時,等于S+(1<<16),也就是S+0x00010000。
根據狀態的劃分能得出一個推論:S != 0時,當寫狀態S&0x0000FFFF = 0時,則讀狀態S>>16 > 0,即讀鎖已被獲取。
寫鎖的獲取與釋放
寫鎖是一個支持重進入的排它鎖。如果當前線程已經獲取了寫鎖,則增加寫狀態。如果當前線程在獲取寫鎖時,讀鎖已經被獲取(讀狀態不為0)或者該線程不是已經獲取寫鎖的線程,則當前線程進入等待狀態。
ReentrantReadWriteLoc的tryAcquire方法示例如下:
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);if (c != 0) {// 存在讀鎖或者當前獲取線程不是已經獲取寫鎖的線程if (w == 0 || current != getExclusiveOwnerThread())return false;if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");setState(c + acquires);return true;}if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true; }該方法除了重入條件(當前線程為獲取了寫鎖的線程)之外,增加了一個 讀鎖是否存在 的判斷。如果存在讀鎖,則寫鎖不能被獲取。
寫鎖的釋放與ReentrantLock的釋放過程基本類似,每次釋放均減少寫狀態,當寫狀態為0時表示寫鎖已被釋放,從而等待的讀寫線程能夠繼續訪問讀寫鎖,同時前次寫線程的修改對后續讀寫線程可見。
讀鎖的獲取與釋放
讀鎖是一個支持重進入的 共享鎖,它能夠被多個線程同時獲取,在沒有其他寫線程訪問(或者寫狀態為0)時,讀鎖總會被成功地獲取,而所做的也只是(線程安全的)增加讀狀態。
如果當前線程已經獲取了讀鎖,則增加讀狀態。如果當前線程在獲取讀鎖時,寫鎖已被其他線程獲取,則進入等待狀態。
ReentrantReadWriteLock的tryAcquireShared方法示例:
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c);if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {...return 1;}return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {int c = getState();if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) {// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}} }如果其他線程已經獲取了寫鎖,則當前線程獲取讀鎖失敗,進入等待狀態。
如果當前線程獲取了寫鎖或者寫鎖未被獲取,則當前線程增加讀狀態,成功獲取讀鎖。
讀鎖的每次釋放均減少讀狀態,減少的值是1<<16
鎖降級
鎖降級指的是 寫鎖降級成為讀鎖。
如果當前線程擁有寫鎖,然后將其釋放,最后再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨后釋放(先前擁有的)寫鎖的過程。
鎖降級的示例
//當數據發生變更后,update變量(布爾類型且volatile修飾)被設置為false public void processData() {readLock.lock();if (!update) {// 必須先釋放讀鎖readLock.unlock();// 鎖降級從寫鎖獲取到開始writeLock.lock();try {if (!update) {// 準備數據的流程update = true;}readLock.lock();} finally {writeLock.unlock();}// 鎖降級完成,寫鎖降級為讀鎖}try {// 使用數據的流程} finally {readLock.unlock();} }上例中,當數據發生變更后,布爾類型且volatile修飾update變量被設置為false,此時所有訪問processData()方法的線程都能夠感知到變化,但只有一個線程能夠獲取到寫鎖,其他線程會被阻塞在讀鎖和寫鎖的lock()方法上。當前線程獲取寫鎖完成數據準備之后,再獲取讀鎖,隨后釋放寫鎖,完成鎖降級。
鎖降級中讀鎖的獲取是否必要呢?
當然是必要的了。主要是為了 保證數據的可見性。
如果當前線程不獲取讀鎖而是直接釋放寫鎖,假設此刻另一個線程(記作線程T)獲取了寫鎖并修改了數據,那么 當前線程無法感知線程T的數據更新。
如果當前線程獲取讀鎖,即遵循鎖降級的步驟,則線程T將會被阻塞,直到當前線程使用數據并釋放讀鎖之后,線程T才能獲取寫鎖進行數據更新。
RentrantReadWriteLock不支持鎖升級。目的也是保證數據可見性,如果讀鎖已被多個線程獲取,其中任意線程成功獲取了寫鎖并更新了數據,則其更新對其他獲取到讀鎖的線程是不可見的。
LockSupport工具
當需要阻塞或喚醒一個線程的時候,都會使用LockSupport工具類來完成相應工作。
LockSupport定義了一組的公共靜態方法,這些方法提供了最基本的 線程阻塞和喚醒功能,而LockSupport也成為構建同步組件的基礎工具。
LockSupport提供的 阻塞和喚醒的方法 如下:
阻塞當前線程,只有調用 unpark(Thread thread)或者被中斷之后才能從park()返回:
park()再park()的基礎上增加了超時返回:
parkNanos(long nanos)阻塞線程知道 deadline 對應的時間點:
parkUntil(long deadline)Java 6時增加,blocker為當前線程在等待的對象:
park(Object blocker)ava 6時增加,blocker為當前線程在等待的對象:
parkNanos(Object blocker, long nanos)Java 6時增加,blocker為當前線程在等待的對象:
parkUntil(Object blocker, long deadline)喚醒處于阻塞狀態的線程 thread:
unpark(Thread thread)有對象參數的阻塞方法在線程dump時,會有更多的現場信息。
Condition接口
任意一個Java對象,都擁有一組監視器方法,定義在java.lang.Object),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,這些方法與synchronized同步關鍵字配合,可以實現等待/通知模式。
Condition接口也提供了類似Object的監視器方法,與Lock配合可以實現 等待/通知 模式,但是這兩者在使用方式以及功能特性上還是有差別的。
Object的監視器方法與Condition接口的對比
示例
Condition接口與示例
Condition定義了等待/通知兩種類型的方法,當前線程調用這些方法時,需要提前獲取到Condition對象關聯的鎖。
Condition對象是由調用Lock對象的newCondition()方法創建出來的,換句話說,Condition是依賴Lock對象的。
Condition的使用方式比較簡單,需要注意在調用方法前獲取鎖。
示例
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException {lock.lock();try {condition.await();} finally {lock.unlock();} } public void conditionSignal() throws InterruptedException {lock.lock();try {condition.signal();} finally {lock.unlock();} }Condition 接口方法介紹:
當前線程進入等待狀態直到被通知或中斷:
void await() throws InterruptedException當前線程進入等待狀態直到被通知,對中斷不敏感:
void awaitUninterruptibly()當前線程進入等待狀態直到被通知、中斷或超時:
long awaitNanos(long var1) throws InterruptedException當前線程進入等待狀態直到被通知、中斷或超時:
boolean await(long var1, TimeUnit var3) throws InterruptedException當前線程進入等待狀態直到被通知、中斷或到某一時間:
boolean awaitUntil(Date var1) throws InterruptedException喚醒Condition上一個在等待的線程:
void signal()喚醒Condition上全部在等待的線程:
void signalAll()獲取一個Condition必須通過Lock的newCondition()方法。
通過下面這個有界隊列的示例我們來深入了解下 Condition 的使用方式:
public class BoundedQueue<T> {private Object[] items;// 添加的下標,刪除的下標和數組當前數量private int addIndex, removeIndex, count;private Lock lock = new ReentrantLock();private Condition notEmpty = lock.newCondition();private Condition notFull = lock.newCondition();public BoundedQueue(int size) {items = new Object[size];}// 添加一個元素,如果數組滿,則添加線程進入等待狀態,直到有"空位"public void add(T t) throws InterruptedException {lock.lock();try {while (count == items.length){notFull.await();}items[addIndex] = t;if (++addIndex == items.length)addIndex = 0;++count;notEmpty.signal();} finally {lock.unlock();}}// 由頭部刪除一個元素,如果數組空,則刪除線程進入等待狀態,直到有新添加元素@SuppressWarnings("unchecked")public T remove() throws InterruptedException {lock.lock();try {while (count == 0)notEmpty.await();Object x = items[removeIndex];if (++removeIndex == items.length)removeIndex = 0;--count;notFull.signal();return (T) x;} finally {lock.unlock();}} }上述代碼add 和 remove 方法 都需要先獲取鎖保證數據的可見性和排它性。
當儲存數組滿了的時候時候調用notFull.await(),線程即釋放鎖并進入等待隊列。
當儲存數組未滿時,則添加到數組,并通知 notEmpty 中等待的線程。
方法中使用while循環是為了防止過早或者意外的通知。
Condition的實現分析
主要包括 等待隊列、等待和通知。
等待隊列
等待隊列是一個FIFO的隊列,在隊列中的每個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了Condition.await()方法,那么該線程將會釋放鎖、構造成節點加入等待隊列并進入等待狀態。同步隊列和等待隊列中節點類型都是同步器的靜態內部類AbstractQueuedSynchronizer.Node。
線程調用Condition.await(),即以當前線程構造節點,并加入等待隊列的尾部。
如下圖所示,Condition的實現是同步器的內部類,因此每個Condition實例都能夠訪問同步器提供的方法,相當于每個Condition都擁有所屬同步器的引用。
等待
調用Condition的await()等方法,會使當前線程進入等待隊列并釋放鎖,同時線程狀態變為等待狀態。當從await()方法返回時,當前線程一定獲取了Condition相關聯的鎖。
示例
public final void await() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();} else {AbstractQueuedSynchronizer.Node node = this.addConditionWaiter();int savedState = AbstractQueuedSynchronizer.this.fullyRelease(node);int interruptMode = 0;while(!AbstractQueuedSynchronizer.this.isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {break;}}if (AbstractQueuedSynchronizer.this.acquireQueued(node, savedState) && interruptMode != -1) {interruptMode = 1;}if (node.nextWaiter != null) {this.unlinkCancelledWaiters();}if (interruptMode != 0) {this.reportInterruptAfterWait(interruptMode);}} }調用該方法的線程成功獲取了鎖的線程,也就是同步隊列中的首節點,該方法會將當前線程構造成節點并加入等待隊列中,然后釋放同步狀態,喚醒同步隊列中的后繼節點,然后當前線程會進入等待狀態。
當等待隊列中的節點被喚醒,則喚醒節點的線程開始嘗試獲取同步狀態。如果不是通過其他線程調用Condition.signal()方法喚醒,而是對等待線程進行中斷,則會拋出InterruptedException。
通知
調用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列中。
示例
public final void signal() {if (!AbstractQueuedSynchronizer.this.isHeldExclusively()) {throw new IllegalMonitorStateException();} else {AbstractQueuedSynchronizer.Node first = this.firstWaiter;if (first != null) {this.doSignal(first);}} } private void doSignal(AbstractQueuedSynchronizer.Node first) {do {if ((this.firstWaiter = first.nextWaiter) == null) {this.lastWaiter = null;}var1.nextWaiter = null;} while (!AbstractQueuedSynchronizer.this.transferForSignal(first) && (first = this.firstWaiter) != null);}調用該方法的前置條件是當前線程必須獲取了鎖,可以看到signal()方法進行了isHeldExclusively()檢查,也就是當前線程必須是獲取了鎖的線程。
接著獲取等待隊列的首節點,將其移動到同步隊列并使用LockSupport喚醒節點中的線程。
通過調用同步器的enq(Node node)方法,等待隊列中的頭節點線程安全地移動到同步隊列。
當節點移動到同步隊列后,當前線程再使用LockSupport喚醒該節點的線程。
被喚醒后的線程,將從await()方法中的while循環中退出isOnSyncQueue(Node node)方法返回true,節點已經在同步隊列中,進而調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。
成功獲取同步狀態之后,被喚醒的線程將從先前調用的await()方法返回,此時該線程已經成功地獲取了鎖。
Condition的signalAll()方法,相當于對等待隊列中的每個節點均執行一次signal()方法,效果就是將等待隊列中所有節點全部移動到同步隊列中,并喚醒每個節點的線程。
了解更多關注我喲!!!
總結
以上是生活随笔為你收集整理的Java中锁的使用和实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C/C++ 文件的后缀名
- 下一篇: Java Float类详解