JUC锁-Semaphore(八)
Semaphore簡介
Semaphore是一個信號計數量,它的本質其實是一個”共享鎖”。
信號量維護了一個信號量許可集。線程可以通過調用acquire()來獲取信號量的許可;當信號量中有可用的許可時,線程能獲取該許可;否則線程必須等待,直到有可用的許可為止。 線程可以通過release()來釋放它所持有的信號量許可。
它的uml圖如下:
是不是感覺有點熟悉呢?沒錯,它跟“ReetrantLock”是一樣的,通過組件sync(繼承AQS),得到了它的模版方法。
Sync包括兩個子類:”公平信號量”FairSync 和 “非公平信號量”NonfairSync。sync是”FairSync的實例”,或者”NonfairSync的實例”;默認情況下,sync是NonfairSync(默認是非公平信號量)。
Semaphore方法
構造方法
public Semaphore(int permits) {sync = new NonfairSync(permits); }public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }從中,我們可以信號量分為“公平信號量(FairSync)”和“非公平信號量(NonfairSync)”。Semaphore(int permits)函數會默認創建“非公平信號量”。
我們來看一下FairSync和NonFaireSync的構造方法:
static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}我們可以看到,他們區別在于tryAcquireShared方法的不同。
下面會對他們作出解釋。
公平信號量獲取和釋放
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits); }信號量中的acquire()獲取函數,實際上是調用的AQS中的acquireSharedInterruptibly()。
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 如果線程是中斷狀態,則拋出異常。if (Thread.interrupted())throw new InterruptedException();// 否則,嘗試獲取“共享鎖”;獲取成功則直接返回,獲取失敗,則通過doAcquireSharedInterruptibly()獲取。if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }Semaphore中”公平鎖“對應的tryAcquireShared()實現如下:
protected int tryAcquireShared(int acquires) {for (;;) {// 判斷“當前線程”是不是CLH隊列中的第一個線程線程,// 若是的話,則返回-1。if (hasQueuedPredecessors())return -1;// 設置“可以獲得的信號量的許可數”int available = getState();// 設置“獲得acquires個信號量許可之后,剩余的信號量許可數”int remaining = available - acquires;// 如果“剩余的信號量許可數>=0”,則設置“可以獲得的信號量許可數”為remaining。if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }說明:tryAcquireShared()的作用是嘗試獲取acquires個信號量許可數。
對于Semaphore而言,state表示的是“當前可獲得的信號量許可數”。
下面看看AQS中doAcquireSharedInterruptibly()的實現:
private void doAcquireSharedInterruptibly(long arg)throws InterruptedException {// 創建”當前線程“的Node節點,且Node中記錄的鎖是”共享鎖“類型;并將該節點添加到CLH隊列末尾。final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 獲取上一個節點。// 如果上一節點是CLH隊列的表頭,則”嘗試獲取共享鎖“。final Node p = node.predecessor();if (p == head) {long r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 當前線程一直等待,直到獲取到共享鎖。// 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀態)。if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }公平信號量的釋放
public void release() {sync.releaseShared(1); }public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits); }信號量的releases()釋放函數,實際上是調用的AQS中的releaseShared()。
releaseShared()在AQS中實現,源碼如下:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }復制代碼
說明:releaseShared()的目的是讓當前線程釋放它所持有的共享鎖。
它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。
Semaphore重寫了tryReleaseShared(),它的源碼如下:
protected final boolean tryReleaseShared(int releases) {for (;;) {// 獲取“可以獲得的信號量的許可數”int current = getState();// 獲取“釋放releases個信號量許可之后,剩余的信號量許可數”int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// 設置“可以獲得的信號量的許可數”為next。if (compareAndSetState(current, next))return true;} }如果tryReleaseShared()嘗試釋放共享鎖失敗,則會調用doReleaseShared()去釋放共享鎖。doReleaseShared()的源碼如下:
private void doReleaseShared() {for (;;) {// 獲取CLH隊列的頭節點Node h = head;// 如果頭節點不為null,并且頭節點不等于tail節點。if (h != null && h != tail) {// 獲取頭節點對應的線程的狀態int ws = h.waitStatus;// 如果頭節點對應的線程是SIGNAL狀態,則意味著“頭節點的下一個節點所對應的線程”需要被unpark喚醒。if (ws == Node.SIGNAL) {// 設置“頭節點對應的線程狀態”為空狀態。失敗的話,則繼續循環。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 喚醒“頭節點的下一個節點所對應的線程”。unparkSuccessor(h);}// 如果頭節點對應的線程是空狀態,則設置“文件點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態。else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}// 如果頭節點發生變化,則繼續循環。否則,退出循環。if (h == head) // loop if head changedbreak;} }說明:doReleaseShared()會釋放“共享鎖”。它會從前往后的遍歷CLH隊列,依次“喚醒”然后“執行”隊列中每個節點對應的線程;最終的目的是讓這些線程釋放它們所持有的信號量。
非公平信號量獲取和釋放
Semaphore中的非公平信號量是NonFairSync。在Semaphore中,“非公平信號量許可的釋放(release)”與“公平信號量許可的釋放(release)”是一樣的。
不同的是它們獲取“信號量許可”的機制不同,下面是非公平信號量獲取信號量許可的代碼。
非公平信號量的tryAcquireShared()實現如下:
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); }nonfairTryAcquireShared()的實現如下:
final int nonfairTryAcquireShared(int acquires) {for (;;) {// 設置“可以獲得的信號量的許可數”int available = getState();// 設置“獲得acquires個信號量許可之后,剩余的信號量許可數”int remaining = available - acquires;// 如果“剩余的信號量許可數>=0”,則設置“可以獲得的信號量許可數”為remaining。if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }說明:非公平信號量的tryAcquireShared()調用AQS中的nonfairTryAcquireShared()。而在nonfairTryAcquireShared()的for循環中,它都會直接判斷“當前剩余的信號量許可數”是否足夠;足夠的話,則直接“設置可以獲得的信號量許可數”,進而再獲取信號量。
而公平信號量的tryAcquireShared()中,在獲取信號量之前會通過if (hasQueuedPredecessors())來判斷“當前線程是不是在CLH隊列的頭部”,是的話,則返回-1。
Semaphore例子
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest1 { private static final int SEM_MAX = 10;public static void main(String[] args) { Semaphore sem = new Semaphore(SEM_MAX);//創建線程池ExecutorService threadPool = Executors.newFixedThreadPool(3);//在線程池中執行任務threadPool.execute(new MyThread(sem, 5));threadPool.execute(new MyThread(sem, 4));threadPool.execute(new MyThread(sem, 7));//關閉池threadPool.shutdown();} }class MyThread extends Thread {private volatile Semaphore sem; // 信號量private int count; // 申請信號量的大小 MyThread(Semaphore sem, int count) {this.sem = sem;this.count = count;}public void run() {try {// 從信號量中獲取count個許可sem.acquire(count);Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " acquire count="+count);} catch (InterruptedException e) {e.printStackTrace();} finally {// 釋放給定數目的許可,將其返回到信號量。sem.release(count);System.out.println(Thread.currentThread().getName() + " release " + count + "");}} }運行結果:
pool-1-thread-1 acquire count=5 pool-1-thread-2 acquire count=4 pool-1-thread-1 release 5 pool-1-thread-2 release 4 pool-1-thread-3 acquire count=7 pool-1-thread-3 release 7總結
以上是生活随笔為你收集整理的JUC锁-Semaphore(八)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JUC锁-CountDownLatch(
- 下一篇: JUC队列-ArrayBlockingQ