日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java并发编程之美-阅读记录10

發布時間:2024/9/15 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java并发编程之美-阅读记录10 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

同步器

10.1CountDownLatch

  在開發過程中經常會遇到在主線程中開啟多個子線程去并行執行任務,并且主線程需要等待子線程執行完畢后在進行匯總。在CountDownLatch出現之前使用線程的join方法,但是join方法不靈活。

  1、案例:

package com.nxz.blog.otherTest;import java.util.concurrent.CountDownLatch;public class TestThread006 {private static CountDownLatch countDownLatch = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {countDownLatch.countDown();}System.out.println("子線程1執行完畢");// 注意下邊代碼try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("不需要子線程執行完畢,只需要調用countdown后,主線程就可以繼續執行");}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {countDownLatch.countDown();}System.out.println("子線程2執行完畢");}});t2.start();t1.start();System.out.println("等待子線程執行完畢");countDownLatch.await();System.out.println("全都執行完畢");} }

?

執行結果:

等待子線程執行完畢 子線程1執行完畢 子線程2執行完畢 全都執行完畢 不需要子線程執行完畢,只需要調用countdown后,主線程就可以繼續執行

在上邊代碼中,創建了一個CountDownLatch,構造函數的參數為2,當主線程調用CountDownLatch.await();方法后,主線程會等待兩個子線程執行完畢后,在子線程中必須調用countDownLatch.countDown()方法,來時計數器減1,當CountDownLatch減兩次后,即計數器為0是,阻塞在await方法就可以返回,主線程就可以繼續向下執行了。

上邊的代碼使用ExecutorService改寫:

package com.nxz.blog.otherTest;import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class TestThread007 {private static CountDownLatch countDownLatch = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {ExecutorService executorService =Executors.newFixedThreadPool(2);executorService.execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}System.out.println("子線程1執行完畢");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("當子線程調用countdown之后,主線程await方法就會方法(只要計數器為0)");}});executorService.execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}System.out.println("子線程2執行完畢");}});System.out.println("等待子線程執行");countDownLatch.await();System.out.println("全都執行完畢");executorService.shutdown();//這個方法需要調用,如果不調用的話,主線程會一直運行,因為線程池并未結束(用戶線程)} }

執行結果:

等待子線程執行 子線程2執行完畢 子線程1執行完畢 全都執行完畢 當子線程調用countdown之后,主線程await方法就會方法(只要計數器為0)

join方法和CountDownLatch區別:

  當調用線程的join方法后,主線程會一直等待子線程執行完畢后,主線程才能繼續執行;而CountDownLatch的await方法,則是當子線程調用了countDown方法后(如果計數器降為0),無論子線程中是否執行完,主線程都會繼續向下執行,并不一定等待子線程執行完所有的代碼。

2、CountDownLatch怎么實現的

  類圖:

  有類圖可以看到,CountDownLatch是通過AQS來實現的。
  、

通過構造函數,可以明顯的看到,計數器最終會賦值給state這個內存可見的變量

// 構造,參數:計數器的大小 public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}protected int tryAcquireShared(int acquires) {// 判斷當前state(即CountDownLatch中的計數器)的值,如果為0是,返回1,否則-1return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();// 獲取當前的state的值if (c == 0) // 防止state減為負數return false;int nextc = c-1;if (compareAndSetState(c, nextc)) // CAS操作,設置新的statereturn nextc == 0;}}}

3、await方法

// 調用await方法,其實就是調用的AQS中的獲取共享資源的方法(可中斷的)public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}// 這個方法為AQS中的方法,而tryAcquireShared方法則是子類實現的方法,即Sync中的方法public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 這個if條件會判斷AQS中的state的值(即CountDownLatch中的計數器的值),如果state==0,返回1,否則返回-1,如果返回1,則直接返回(即不阻塞主線程),如果返回-1時,則會阻塞線程if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); // 將線程假如阻塞隊列,并且自旋判斷state是否為0,為0是await方法會返回。}

4、countdown方法

public void countDown() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {// 這個tryRealeaseShared也是調用的子類的方法,即上邊Sync中的if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

?

?10.2CyclicBarrier

  CountDownLatch在一定程度上優化了join方法,但是CountDownLatch的計數器一點變為0,當下次調用await時就不不起作用,即不能重復使用。而CyclicBarrier則優化了這一點,使屏障點可以重復使用,

?  1、案例

  由執行結果可以看出屏障CyclicBarrier可以重復使用,當子線程執行自己的任務后,回調用await方法,此時子線程阻塞,并且計數器減1,當第二個子線程通樣調用await時,進入屏障,此時計數器再減為0,這時候會執行CyclicBarrier構造函數中的任務,執行完畢后,會喚醒第二個線程繼續向下執行,這時候第一個阻塞的線程也會繼續向下執行。

下邊這個例子說明當所有線程到達屏障點后,才能一塊繼續向下執行。同時也表明屏障點是可以重復使用的

package com.nxz.blog.otherTest;import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class TestThread008 {// 第一個參數是計數器大小,第二個參數是當計數器為0時,執行的任務private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {@Overridepublic void run() {System.out.println("全部子線程到達屏障點,所有子線程繼續向下執行。。。");}});public static void main(String[] args) {ExecutorService executorService =Executors.newFixedThreadPool(2);executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("子線程1執行--到達屏障點");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("子線程1繼續向下執行");}});executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("子線程2執行--到達屏障點");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("子線程2繼續向下執行");}});//$$$$$$$$$$$$$$$$$$$$$$$$$$$$]executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("子線程3執行--到達屏障點");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("子線程3繼續向下執行");}});executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println("子線程4執行--到達屏障點");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("子線程4繼續向下執行");}});executorService.shutdown();} }執行結果:子線程1執行--到達屏障點 子線程2執行--到達屏障點 全部子線程到達屏障點,所有子線程繼續向下執行。。。 子線程2繼續向下執行 子線程1繼續向下執行 子線程3執行--到達屏障點 子線程4執行--到達屏障點 全部子線程到達屏障點,所有子線程繼續向下執行。。。 子線程4繼續向下執行 子線程3繼續向下執行

2、類圖

  有類圖可以看出,CyclicBarrier是基于獨占鎖實現的,本質還是基于AQS的。parties用來記錄線程數,這里表示多少線程調用了await方法后,所有線程才能從屏障點繼續向下執行。而count一開始等于parties,每當有線程調用await方法就遞減1,當count=0時,就表示所有線程到達屏障點。

  

/** 用來保證操作count的原子性*/private final ReentrantLock lock = new ReentrantLock();/** 條件變量,用于支持線程間的await和signal操作*/private final Condition trip = lock.newCondition();/** 容量,需要到達屏障點的個數*/private final int parties;/* 所有線程都到達屏障點后,需要執行的任務 */private final Runnable barrierCommand;/** 該對象內部維護這一個broken變量,用來表示當前屏障是否被打破*/private Generation generation = new Generation();public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}

3、await方法

// 當線程調用該方法時,線程會阻塞,當滿足這幾個條件是才返回://1、有parties個線程調用的await方法,也就是線程都到達了屏障點;//2、當其他線程調用的當前線程的interupt方法中斷了當前線程,即破會了屏障,此時屏障會失效3、generation對象中的broken設置為true時,會跑出異常,并返回public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}// 阻塞一定時間后返回public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}// 核心方法private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 以下為關鍵代碼int index = --count;// 如果此時count為0,則進入線程喚醒階段if (index == 0) { // trippedboolean ranAction = false;try {// 當前線程直接執行,即使異常了,也會調用breakBarrier方法來喚醒其他等待的線程final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;nextGeneration();// 喚醒 調用signalAllreturn 0;} finally {if (!ranAction)breakBarrier();// 喚醒 調用signalAll}}// loop until tripped, broken, interrupted, or timed out// 阻塞for (;;) {try {// 如果沒有時間限制,即timed=false,則直接調用condition.await方法,釋放鎖,并阻塞線程if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 出現異常了,即其他線程調用了interupt中斷方法if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}

?

10.3Semaphore信號量

  Semaphore信號量也是java中的同步器,和CountDownLatch和CyclicBarrier不同的是,它內部設計的計數器是遞增的,并且在一開始初始化Semaphore時可以指定一個初始值,但是它并不需要知道需要同步的線程的個數,而是在需要同步的時候調用acquire方法時指定需要同步的線程個數。

  1、案例

package com.nxz.blog.otherTest;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;public class TestThread009 {private static Semaphore semaphore = new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread() + "執行完");// 每當調用了一次release方法后,許可就會累加1semaphore.release();}});executorService.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread() + "執行完");semaphore.release();}});// 當將permits設置為比2大的值時,代碼會阻塞在該處,不能夠向下執行(假如是3的話,就代表它需要3個permits許可,而只有兩個線程釋放了兩個許可,main不能繼續執行semaphore.acquire(2);System.out.println("main執行完");executorService.shutdown();} }

?  1、類圖

    由類圖可以看出 ,Semaphore也是基于AQS實現的。Sync只是對AQS的修飾,并且該類有兩個實現類,用來指定獲取信號量的時候是否使用公平策略。

?

  

?

總結

以上是生活随笔為你收集整理的java并发编程之美-阅读记录10的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。