日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java 中的并发工具类

發布時間:2023/12/9 java 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java 中的并发工具类 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

From: https://blog.wuwii.com/juc-utils.html

java.util.concurrent 下提供了一些輔助類來幫助我們在并發編程的設計。

學習了 AQS 后再了解這些工具類,就非常簡單了。

jdk 1.8

等待多線程完成的CountDownLatch

在 concurrent 包下面提供了 CountDownLatch 類,它提供了計數器的功能,能夠實現讓一個線程等待其他線程執行完畢才能進入運行狀態。

源碼分析

  • 首先看下最關鍵的地方它的自定義同步器的實現,非常簡單:

    • private static final class Sync extends AbstractQueuedSynchronizer {

      private static final long serialVersionUID = 4982264981922014374L;

      // 1. 初始化 state 資源變量

      Sync(int count) {

      setState(count);

      }

      int getCount() {

      return getState();

      }

      // 嘗試獲取貢獻模式下的資源,

      // 定義返回值小于 0 的時候獲取資源失敗

      protected int tryAcquireShared(int acquires) {

      return (getState() == 0) ? 1 : -1;

      }

      protected boolean tryReleaseShared(int releases) {

      // Decrement count; signal when transition to zero

      // 自旋。

      for (;;) {

      int c = getState();

      if (c == 0)

      return false;

      int nextc = c - 1; // 每次釋放資源,硬編碼減一個資源

      if (compareAndSetState(c, nextc))

      return nextc == 0; // 知道為 0 的時候才釋放成功,也就是所有線程必須都執行釋放操作說明才釋放成功。

      }

      }

      }

      在這里查看構造器的源碼得知,CountDownLatch 內部使用的是 內部類Sync 繼承了 AQS ,將我們傳入進來的 count 數值當作 AQS state。感覺這個是不是和可重入鎖實現是一樣的,只不過開始指定了線程獲取的鎖的次數。

      在上面我也發現了幾個特點,第一次看這個代碼其實還是不好理解,因為它相對前面的 AQS 和 TwinsLock 就是一個反著設計的代碼:

    • 首先獲取資源的時候,線程全部都是先進入等待隊列,而且在這一步驟中,不改變 state 資源的數量;
    • 釋放資源的時候,每次固定減少一個資源,直到資源為 0 的時候才表示釋放資源成功,所以加入我們有 5 個資源,但是只有四個線程執行,如果只釋放四次(總共執行 countDown 四次),就永遠也釋放不成功,await 一直在阻塞。
    • 經過上面的分析,發現了 state 的資源數量每次進行 countDown 都去減少一個,沒有方法去增加數量,所以它是不可逆的,它的計數器是不可以重復使用的。
    • 看下 await 的實現,發現它最終實現的是 doAcquireSharedInterruptibly :

  • // 仔細看這個代碼,和前面的共享模式中的 doAcquireShared 方法基本一摸一樣,只不過是當它遇到線程中斷信號的時候,立刻拋出中斷異常,仔細想想也是的,比如,自己在這里等別人吃飯,不想等了,也懶得管別人做什么了,剩下的吃飯的事情也沒必要繼續下去了。

    private void doAcquireSharedInterruptibly(int arg)

    throws InterruptedException {

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

    for (;;) {

    final Node p = node.predecessor();

    if (p == head) {

    int r = tryAcquireShared(arg);

    if (r >= 0) {

    setHeadAndPropagate(node, r);

    p.next = null; // help GC

    failed = false;

    return;

    }

    }

    if (shouldParkAfterFailedAcquire(p, node) &&

    parkAndCheckInterrupt())

    throw new InterruptedException();

    }

    } finally {

    if (failed)

    cancelAcquire(node);

    }

    }

    // 需要注意的是它重寫了嘗試獲取資源的方法,當資源全部消耗完,才能夠讓你去獲取資源,現在才豁然開朗,await 阻塞的線程就是這么被喚醒的。

    protected int tryAcquireShared(int acquires) {

    return (getState() == 0) ? 1 : -1;

    }

  • 使用場景

    CountDownLatch允許一個或多個線程等待其他線程完成操作。

    比如經典問題:

    有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現?

    這個問題關鍵就是要知道四條線程何時執行完。

    下面是我的解決思路:

    /**

    * 如有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,

    * 所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現?

    *

    * Created by KronChan on 2018/5/14 17:00.

    */

    public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {

    // 初始化計數器,設置總量i,調用一次countDown()方法后i的值會減1。

    // 在一個線程中如果調用了await()方法,這個線程就會進入到等待的狀態,當參數i為0的時候這個線程才繼續執行。

    final CountDownLatch countDownLatch = new CountDownLatch(4);

    Runnable thread1 = () -> {

    try {

    TimeUnit.SECONDS.sleep(2);

    System.out.println("統計 C 盤大小");

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    // 統計完成計數器 -1

    countDownLatch.countDown();

    };

    Runnable thread2 = () -> {

    try {

    TimeUnit.SECONDS.sleep(2);

    System.out.println("統計 D 盤大小");

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    countDownLatch.countDown();

    };

    Runnable thread3 = () -> {

    try {

    TimeUnit.SECONDS.sleep(2);

    System.out.println("統計 E 盤大小");

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    countDownLatch.countDown();

    };

    Runnable thread4 = () -> {

    try {

    TimeUnit.SECONDS.sleep(2);

    System.out.println("統計 F 盤大小");

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    countDownLatch.countDown();

    };

    ExecutorService pool = Executors.newFixedThreadPool(4);

    pool.execute(thread1);

    pool.execute(thread2);

    pool.execute(thread3);

    pool.execute(thread4);

    // 等待 i 值為 0 ,等待四條線程執行完畢。

    countDownLatch.await();

    System.out.println("統計完成");

    pool.shutdown();

    }

    }

    同步屏障CyclicBarrier

    CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。

    源碼分析

    屬性

    private final ReentrantLock lock = new ReentrantLock();

    // 線程協作

    private final Condition trip = lock.newCondition();

    // 必須同時到達barrier的線程個數。

    private final int parties;

    // parties個線程到達barrier時,會執行的動作,會讓到達屏障中的任意一個線程去執行這個動作。

    private final Runnable barrierCommand;

    // 控制屏障的循環使用,它是可重復使用的,每次使用CyclicBarrier,本次所有線程同屬于一代,即同一個Generation

    private Generation generation = new Generation();

    // 處在等待狀態的線程個數。

    private int count;

    主要的方法

    構造函數

    public CyclicBarrier(int parties) {

    this(parties, null);

    }

    // 構造函數主要實現了,設置一組線程的數量,到達屏障時候的臨界點,可以設置到達屏障的時候需要處理的動作,后面屏障允許它們通過。

    public CyclicBarrier(int parties, Runnable barrierAction) {

    if (parties <= 0) throw new IllegalArgumentException();

    this.parties = parties;

    this.count = parties;

    this.barrierCommand = barrierAction;

    }

    await

    public int await() throws InterruptedException, BrokenBarrierException {

    try {

    return dowait(false, 0L);

    } catch (TimeoutException toe) {

    throw new Error(toe); // cannot happen;

    }

    }

    private int dowait(boolean timed, long nanos)

    throws InterruptedException, BrokenBarrierException,

    TimeoutException {

    final ReentrantLock lock = this.lock;

    // 獨占鎖

    lock.lock();

    try {

    // 保存當前的generation

    final Generation g = generation;

    // generation broken,不允許使用,則拋出異常。

    if (g.broken)

    throw new BrokenBarrierException();

    // 如果當前線程被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。

    if (Thread.interrupted()) {

    breakBarrier();

    throw new InterruptedException();

    }

    // 等待的計數器減一

    int index = --count;

    // 如果計數器的 count 正好為0, 說明已經有parties個線程到達barrier了。執行預定的Runnable任務后,更新換代,準備下一次使用。

    if (index == 0) { // tripped

    boolean ranAction = false;

    try {

    // 如果barrierCommand不為null,則執行該動作。

    final Runnable command = barrierCommand;

    if (command != null)

    command.run();

    ranAction = true;

    // 喚醒所有等待線程,并更新generation,準備下一次使用

    nextGeneration();

    return 0;

    } finally {

    if (!ranAction)

    breakBarrier();

    }

    }

    // 當前線程一直阻塞,

    // 1. 有parties個線程到達barrier

    // 2. 當前線程被中斷

    // 3. 超時

    // 直到上面三者之一發生,就喚醒所有線程繼續執行下去

    // 自旋

    for (;;) {

    try {

    // 如果不是超時等待,則調用awati()進行等待;否則,調用awaitNanos()進行等待。

    if (!timed)

    trip.await();

    else if (nanos > 0L)

    nanos = trip.awaitNanos(nanos);

    } catch (InterruptedException ie) {

    // 如果等待過程中,線程被中斷,通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線

    if (g == generation && ! g.broken) {

    breakBarrier();

    throw ie;

    } else {

    Thread.currentThread().interrupt();

    }

    }

    // borken

    if (g.broken)

    throw new BrokenBarrierException();

    // 如果generation已經換代,則返回index。

    if (g != generation)

    return index;

    // 超時,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。

    if (timed && nanos <= 0L) {

    breakBarrier();

    throw new TimeoutException();

    }

    }

    } finally {

    lock.unlock(); // 釋放獨占鎖

    }

    }

    // barrier被broken后,調用breakBarrier方法,將generation.broken設置為true,并使用signalAll通知所有等待的線程。

    private void breakBarrier() {

    generation.broken = true;

    count = parties;

    trip.signalAll();

    }

    使用場景

    CyclicBarrier可以用于多線程計算數據,最后合并計算結果的場景,然后四條線程又可以分別去干自己的事情了。

    現在我將上面的統計磁盤的任務 CountDownLatch 中改下,統計完統計最終后,每個線程要發出退出信號。

    下面是我的實現代碼:

    public class CyclicBarrierDemo {

    public static void main(String[] args) {

    String[] drivers = {"C", "D", "E", "F"};

    int length = drivers.length;

    ExecutorService pool = Executors.newFixedThreadPool(length);

    // 如果線程都到達barrier狀態后,會從四個線程中選擇一個線程去執行Runnable。

    CyclicBarrier cyclicBarrier = new CyclicBarrier(length, () -> {

    System.out.printf("%s 線程告訴你,統計完畢,待繼續執行%n", Thread.currentThread().getName());

    });

    Stream.of(drivers).forEach((d) -> {

    pool.execute(new StatisticsDemo(d, cyclicBarrier));

    });

    pool.shutdown();

    }

    static class StatisticsDemo implements Runnable {

    private String driveName;

    private CyclicBarrier cyclicBarrier;

    public StatisticsDemo(String driveName, CyclicBarrier cyclicBarrier) {

    this.driveName = driveName;

    this.cyclicBarrier = cyclicBarrier;

    }

    @Override

    public void run() {

    try {

    TimeUnit.SECONDS.sleep((int) (Math.random() * 10));

    System.out.printf("%s 線程統計 %s 盤大小%n", Thread.currentThread().getName(), driveName);

    cyclicBarrier.await();

    } catch (InterruptedException | BrokenBarrierException e) {

    e.printStackTrace();

    }

    System.out.printf("%s 準備退出%n", driveName);

    }

    }

    }

    執行結果:

    pool-1-thread-1 線程統計 C 盤大小

    pool-1-thread-2 線程統計 D 盤大小

    pool-1-thread-3 線程統計 E 盤大小

    pool-1-thread-4 線程統計 F 盤大小

    pool-1-thread-4 線程告訴你,統計完畢,待繼續執行

    F 準備退出

    E 準備退出

    D 準備退出

    C 準備退出

    控制并發線程數的Semaphore

    Semaphore(信號量)是用來控制同時訪問特定資源的線程數量(許可證數),它通過協調各個線程,以保證合理的使用公共資源。

    源碼分析

    構造函數

    public Semaphore(int permits) {

    sync = new NonfairSync(permits);

    }

    public Semaphore(int permits, boolean fair) {

    sync = fair ? new FairSync(permits) : new NonfairSync(permits);

    }

    具有公平鎖的特性,permits 指定許可數量,就是資源數量 state。

    同步器的實現

    abstract static class Sync extends AbstractQueuedSynchronizer {

    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {

    setState(permits);

    }

    final int getPermits() {

    return getState();

    }

    // 非公平的方式獲取共享鎖

    final int nonfairTryAcquireShared(int acquires) {

    for (;;) {

    // 獲取資源數量

    int available = getState();

    int remaining = available - acquires; // 本次請求獲取鎖需要的資源的數量

    if (remaining < 0 ||

    compareAndSetState(available, remaining)) // 如果資源足夠,嘗試 CAS 獲取鎖

    return remaining;

    }

    }

    // 釋放鎖

    protected final boolean tryReleaseShared(int releases) {

    for (;;) {

    int current = getState();

    int next = current + releases; // 釋放鎖的時候,返還資源

    if (next < current) // overflow

    throw new Error("Maximum permit count exceeded");

    if (compareAndSetState(current, next)) // CAS 操作,避免其他的線程也在釋放資源

    return true;

    }

    }

    // 減少資源數量

    final void reducePermits(int reductions) {

    for (;;) {

    int current = getState();

    int next = current - reductions;

    if (next > current) // underflow

    throw new Error("Permit count underflow");

    if (compareAndSetState(current, next))

    return;

    }

    }

    // 清空資源,返回歷史資源數量

    final int drainPermits() {

    for (;;) {

    int current = getState();

    if (current == 0 || compareAndSetState(current, 0))

    return current;

    }

    }

    }

    /**

    * NonFair version

    */

    static final class NonfairSync extends Sync {

    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {

    super(permits);

    }

    protected int tryAcquireShared(int acquires) {

    return nonfairTryAcquireShared(acquires);

    }

    }

    /**

    * Fair version

    */

    static final class FairSync extends Sync {

    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {

    super(permits);

    }

    protected int tryAcquireShared(int acquires) {

    for (;;) {

    // 同樣的公平鎖情況下,判斷該線程前面有沒有線程等待獲取鎖

    if (hasQueuedPredecessors())

    return -1;

    int available = getState();

    int remaining = available - acquires;

    if (remaining < 0 ||

    compareAndSetState(available, remaining))

    return remaining;

    }

    }

    }

    提供其他的方法

    • availablePermits:獲取此信號量中當前可用的許可證數(還能有多少個線程執行);
    • drainPermits:立刻使用完所有可用的許可證;
    • reducePermits:減少相應數量的許可證,是一個 protected 方法;
    • isFair:是否是公平狀態;
    • hasQueuedThreads:等待隊列中是否有線程,等待獲取許可證;
    • getQueueLength:等待隊列中等待獲取許可證的線程數量;
    • getQueuedThreads:protected 方法,獲取等待隊列中的線程。

    使用場景

    Semaphore可以用于做流量控制,特別是公用資源有限的應用場景,比如我們有五臺機器,有十名工人,每個工人需要一臺機器才能工作,一名工人工作完了就可以休息了,機器讓其他沒工作過的工人使用。

    下面是我的實現代碼:

    public class SemaphoreDemo {

    public static void main(String[] args) {

    int num = 10;

    Semaphore machines = new Semaphore(5);

    for (int i = 0; i < num; i++) {

    new Thread(new Worker(i, machines)).start();

    }

    }

    static class Worker extends Thread {

    private Semaphore machines;

    private int worker;

    Worker(int worker, Semaphore semaphore) {

    this.worker = worker;

    this.machines = semaphore;

    }

    @Override

    public void run() {

    try {

    machines.acquire();

    System.out.printf("工人 %d 開始使用機器工作了 %n", worker);

    TimeUnit.SECONDS.sleep((int) (Math.random() * 10));

    System.out.printf("工人 %d 干完活了,讓出機器了%n", worker);

    machines.release();

    } catch (Exception e) {

    e.printStackTrace();

    }

    }

    }

    }

    執行一下結果:

    工人 0 開始使用機器工作了

    工人 4 開始使用機器工作了

    工人 3 開始使用機器工作了

    工人 2 開始使用機器工作了

    工人 1 開始使用機器工作了

    工人 1 干完活了,讓出機器了

    工人 5 開始使用機器工作了

    工人 5 干完活了,讓出機器了

    工人 6 開始使用機器工作了

    工人 2 干完活了,讓出機器了

    工人 7 開始使用機器工作了

    工人 4 干完活了,讓出機器了

    工人 8 開始使用機器工作了

    工人 0 干完活了,讓出機器了

    工人 9 開始使用機器工作了

    工人 8 干完活了,讓出機器了

    工人 6 干完活了,讓出機器了

    工人 3 干完活了,讓出機器了

    工人 9 干完活了,讓出機器了

    工人 7 干完活了,讓出機器了

    雖然上面有 10 個工人(線程)一起并發,但是,它同時只有五個工人能夠是執行的。

    線程間交換數據的Exchanger

    Exchanger(交換者)是一個用于線程間協作的工具類。Exchanger 用于兩個工作線程間的數據交換。

    具體上來說,Exchanger類允許在兩個線程之間定義同步點。當兩個線程都到達同步點時,他們交換數據結構,因此第一個線程的數據進入到第二個線程中,第二個線程的數據進入到第一個線程中,這要就完成了一個“交易”的環節。

    源碼分析

    源碼很難看懂,主要還是

    【死磕Java并發】—–J.U.C之并發工具類:Exchanger

    使用場景

    Exchanger 可以用于遺傳算法。遺傳算法里需要選出兩個人作為交配對象,這時候會交換兩人的數據。

    下面做一個賣書買書的例子:

    public class ExchangerDemo {

    private static final Exchanger<String> EXCHANGER = new Exchanger<>();

    private static final ExecutorService POOLS = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws InterruptedException {

    POOLS.execute(() -> {

    String bookName = "浮生六記";

    System.out.printf("飯飯要賣一本%s。%n", bookName);

    try {

    String pay = EXCHANGER.exchange(bookName);

    System.out.printf("飯飯賣出一本%s賺了%s¥。%n", bookName, pay);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    });

    TimeUnit.SECONDS.sleep(5);

    System.out.println("》》》》》》》》飯飯先到了交易地點 睡了 5 s,七巧來了");

    System.out.println("》》》》》》》》準備交易");

    POOLS.execute(() -> {

    String pay = "23";

    try {

    String bookName = EXCHANGER.exchange(pay);

    System.out.printf("七巧付了%s¥買了一本%s。%n", pay, bookName);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    });

    POOLS.shutdown();

    for (; ; ) {

    if (POOLS.isTerminated()) {

    System.out.println("交易結束!");

    return;

    }

    }

    }

    }

    執行結果:

    飯飯要賣一本浮生六記。

    》》》》》》》》飯飯先到了交易地點 睡了 5 s,七巧來了

    》》》》》》》》準備交易

    七巧付了23¥買了一本浮生六記。

    飯飯賣出一本浮生六記賺了23¥。

    交易結束!

    總結

    Exchanger主要完成的是兩個工作線程之間的數據交換,如果有一個線程沒有執行 exchange()方法,則會一直等待。還可以設置最大等待時間exchange(V v, TimeUnit unit)

    CyclicBarrier和CountDownLatch的區別

    CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業務場景。例如,如果計算發生錯誤,可以重置計數器,并讓線程重新執行一次。

    CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得 CyclicBarrier阻塞的線程數量。isBroken()方法用來了解阻塞的線程是否被中斷。

    參考文章

    • 《Java 并發編程的藝術》

    總結

    以上是生活随笔為你收集整理的Java 中的并发工具类的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。