java高并发(十三)并发容器J.U.C--AQS
AbstractQueueSynchronizer (AQS)
J.U.C 大大提高了java并發(fā)的性能,而AQS則是J.U.C的核心。
AQS底層使用雙向列表(隊(duì)列的一種實(shí)現(xiàn))。
- 使用Node實(shí)現(xiàn)FIFO隊(duì)列,可以用于構(gòu)建鎖或者其他同步裝置的基礎(chǔ)框架
- 利用了一個(gè)int類型表示狀態(tài)。?在AQS中有一個(gè)status的成員變量,基于AQS有一個(gè)同步組件ReentrantLock,在這個(gè)ReentrantLock中status表示獲取鎖的線程數(shù),例如status=0表示還沒有線程獲取鎖,status=1表示已經(jīng)有線程獲取了鎖,status>1表示重入鎖的數(shù)量。
- 使用方法:繼承
- 子類通過繼承并通過實(shí)現(xiàn)它的方法管理其狀態(tài){acquire和release}的方法操縱狀態(tài)
- 可以同時(shí)實(shí)現(xiàn)排它鎖和共享鎖模式(獨(dú)占,共享)
AQS同步組件
- countdownLatch,閉鎖,通過一個(gè)計(jì)數(shù)來保證線程是否需要一直阻塞。
- semaphore,控制同一時(shí)間并發(fā)線程的數(shù)量。
- cyclicbarrier,與countdownlatch很像,都能阻塞進(jìn)程。
- reentrantlock
- condition
- futuretask
countdownlatch?
是一個(gè)同步輔助類,通過他可以實(shí)現(xiàn)類似于阻塞當(dāng)前線程的功能。一個(gè)線程或多個(gè)線程一直等待,直到其他線程操作完成,countdownlatch用了一個(gè)給定的計(jì)數(shù)器來進(jìn)行初始化,該計(jì)數(shù)器的操作是原子操作,也就是同時(shí)只能有一個(gè)線程操作該計(jì)數(shù)器。調(diào)用該類的await()方法則會(huì)一直處于阻塞狀態(tài),直到其他線程調(diào)用countdown()方法,每次調(diào)用countdown()方法會(huì)使得計(jì)數(shù)器的值減1,當(dāng)計(jì)數(shù)器的值減為0時(shí),所有因調(diào)用await方法處于等待狀態(tài)的線程就會(huì)繼續(xù)往下執(zhí)行。這種狀態(tài)只會(huì)出現(xiàn)一次,因?yàn)檫@里的計(jì)數(shù)器是不能被重置的,如果業(yè)務(wù)上需要一個(gè)可以重置計(jì)數(shù)次數(shù)的版本,可以考慮使用cyclicbarrier。
使用場(chǎng)景
在某些業(yè)務(wù)場(chǎng)景中,程序執(zhí)行需要等到某個(gè)條件完成后才能繼續(xù)執(zhí)行后續(xù)的操作,典型的應(yīng)用例如并行計(jì)算:當(dāng)某個(gè)處理的運(yùn)算量很大時(shí),可以將該運(yùn)算任務(wù)拆分多個(gè)子任務(wù),等待所有的子任務(wù)都完成之后,父任務(wù)拿到所有的子任務(wù)的運(yùn)行結(jié)果進(jìn)行匯總。
下面舉例countdownlatch的基本用法:
@Slf4j public class CountDownLatchExample1 {private final static int threadCount = 200;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();final CountDownLatch countDownLatch = new CountDownLatch(threadCount);for(int i = 0; i< threadCount; i++) {final int threadNum = i;executorService.execute(() ->{try {test(threadNum);} catch (InterruptedException e) {log.error("exception", e);}finally {countDownLatch.countDown();}});}//可以保證之前的線程都執(zhí)行完成countDownLatch.await();log.info("finish");executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {Thread.sleep(100);log.info("{}", threadNum);Thread.sleep(100);} }一個(gè)復(fù)雜的場(chǎng)景:我們開了很多個(gè)線程去完成一個(gè)任務(wù),但是這個(gè)任務(wù)需要在指定的時(shí)間內(nèi)完成,如果超過一定的時(shí)間沒有完成則放棄該任務(wù)。
@Slf4j public class CountDownLatchExample2 {private final static int threadCount = 200;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();final CountDownLatch countDownLatch = new CountDownLatch(threadCount);for(int i = 0; i< threadCount; i++) {final int threadNum = i;executorService.execute(() ->{try {test(threadNum);} catch (InterruptedException e) {log.error("exception", e);}finally {countDownLatch.countDown();}});}//可以保證之前的線程都執(zhí)行完成countDownLatch.await(10, TimeUnit.MILLISECONDS);log.info("finish");// 第一時(shí)間內(nèi)并不會(huì)把所有線程都銷毀,而是讓當(dāng)前已有線程執(zhí)行完之后在把線程池銷毀。executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {Thread.sleep(100);log.info("{}", threadNum);} }semaphore 信號(hào)量
可以控制某個(gè)資源可被同時(shí)訪問的個(gè)數(shù),與countdownlatch有些類似,提供了兩個(gè)核心方法:aquire和release。aquire表示獲取一個(gè)許可,如果沒有則等待,release表示操作完成后釋放一個(gè)許可。semaphore維護(hù)了當(dāng)前訪問的個(gè)數(shù),提供同步機(jī)制控制訪問的個(gè)數(shù)。
使用場(chǎng)景?
常用于僅能提供有限訪問的資源例如數(shù)據(jù)庫連接數(shù)是有限的,而上層應(yīng)用的并發(fā)數(shù)會(huì)遠(yuǎn)遠(yuǎn)大于連接數(shù),如果同時(shí)對(duì)數(shù)據(jù)庫進(jìn)行操作可能出現(xiàn)因?yàn)闊o法獲取數(shù)據(jù)庫連接而導(dǎo)致異常。這時(shí)可以通過信號(hào)量semaphore來并發(fā)訪問控制。當(dāng)semaphore把并發(fā)數(shù)控制到1時(shí)就跟單線程運(yùn)行很相似了。
舉例如下:
@Slf4j public class SemaphoreExample1 {private final static int threadCount = 20;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();//允許的并發(fā)數(shù)final Semaphore semaphore = new Semaphore(3);for(int i = 0; i< threadCount; i++) {final int threadNum = i;executorService.execute(() ->{try {// 獲取一個(gè)許可semaphore.acquire();test(threadNum);// 釋放一個(gè)許可semaphore.release();} catch (InterruptedException e) {log.error("exception", e);}});}log.info("finish");executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {log.info("{}", threadNum);Thread.sleep(1000);} }運(yùn)行結(jié)果可以看到同時(shí)3個(gè)線程在執(zhí)行。
也可以獲得多個(gè)許可:
@Slf4j public class SemaphoreExample2 {private final static int threadCount = 20;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();//允許的并發(fā)數(shù)final Semaphore semaphore = new Semaphore(3);for(int i = 0; i< threadCount; i++) {final int threadNum = i;executorService.execute(() ->{try {// 獲取多個(gè)許可semaphore.acquire(3);test(threadNum);// 釋放多個(gè)許可semaphore.release(3);} catch (InterruptedException e) {log.error("exception", e);}});}log.info("finish");executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {log.info("{}", threadNum);Thread.sleep(1000);} }每一次獲取三個(gè)許可,而同時(shí)只允許3個(gè)并發(fā)數(shù),相當(dāng)于單線程在運(yùn)行。
@Slf4j public class SemaphoreExample3 {private final static int threadCount = 20;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();//允許的并發(fā)數(shù)final Semaphore semaphore = new Semaphore(3);for (int i = 0; i < threadCount; i++) {final int threadNum = i;executorService.execute(() -> {try {// 嘗試獲取一個(gè)許可if (semaphore.tryAcquire()) {test(threadNum);// 釋放一個(gè)許可semaphore.release();}} catch (InterruptedException e) {log.error("exception", e);}});}log.info("finish");executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {log.info("{}", threadNum);Thread.sleep(1000);} }輸出結(jié)果:
15:24:21.098 [pool-1-thread-1] INFO com.vincent.example.aqs.SemaphoreExample3 - 0 15:24:21.098 [pool-1-thread-2] INFO com.vincent.example.aqs.SemaphoreExample3 - 1 15:24:21.098 [main] INFO com.vincent.example.aqs.SemaphoreExample3 - finish 15:24:21.098 [pool-1-thread-3] INFO com.vincent.example.aqs.SemaphoreExample3 - 2因?yàn)槲覀兺€程池中放了二十個(gè)請(qǐng)求,二十個(gè)請(qǐng)求在同一時(shí)間內(nèi)都會(huì)嘗試去執(zhí)行,semaphore會(huì)嘗試讓每個(gè)線程去獲取許可,而同一時(shí)刻內(nèi)我們的并發(fā)數(shù)是3,也就是只有三個(gè)線程獲取到了許可,而test方法內(nèi)有Thread.sleep(1000),因此其余17個(gè)線程都不能拿到許可,直接結(jié)束。
semaphore.tryAcquire(3, TimeUnit.SECONDS)表示可以等3秒,如果3秒內(nèi)沒拿到許可就結(jié)束。
CyclicBarrier
也是一個(gè)同步輔助類,允許一組線程相互等待,直到到達(dá)某個(gè)公共的屏障點(diǎn)。可以完成多個(gè)線程之間相互等待,只有當(dāng)每個(gè)線程都準(zhǔn)備就緒后,才能各自繼續(xù)往下執(zhí)行謀面的操作。它和countdownlatch有相似的地方,都是通過計(jì)數(shù)器來實(shí)現(xiàn)的,當(dāng)一個(gè)線程調(diào)用await()方法后,該線程就進(jìn)入了等待狀態(tài)。當(dāng)循環(huán)計(jì)數(shù)器的值達(dá)到設(shè)置的初始值之后,進(jìn)入等待狀態(tài)的線程會(huì)被喚醒,繼續(xù)執(zhí)行后續(xù)操作。因?yàn)镃yclicBarrier在釋放等待線程后可以重用,所以稱他為循環(huán)屏障。
CyclicBarrier的使用場(chǎng)景與countdownlatch類似,CyclicBarrier可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的應(yīng)用場(chǎng)景。
CyclicBarrier與Countdownlatch的區(qū)別:
- countdownlatch的計(jì)數(shù)器只能使用一次,CyclicBarrier可以使用reset方法重復(fù)使用
- countdownlatch主要是實(shí)現(xiàn)一個(gè)或n個(gè)線程需要等待其他線程完成某項(xiàng)操作之后,才能繼續(xù)往下執(zhí)行,他描述的是1個(gè)或n個(gè)線程等待其他線程的關(guān)系。而CyclicBarrier主要實(shí)現(xiàn)了多個(gè)線程之間相互等待,直到所有線程都滿足了條件之后才能繼續(xù)執(zhí)行后續(xù)的操作,它描述的是各個(gè)線程內(nèi)部相互等待的關(guān)系。所以CyclicBarrier可以處理更復(fù)雜的業(yè)務(wù)場(chǎng)景,例如計(jì)數(shù)器發(fā)生錯(cuò)誤可以重置計(jì)數(shù)器,讓線程重新執(zhí)行一次。
總結(jié)
以上是生活随笔為你收集整理的java高并发(十三)并发容器J.U.C--AQS的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java高并发(十二)并发容器J.U.C
- 下一篇: java高并发(十四)ReetrantL