日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

基于CompletableFuture并发任务编排实现

發布時間:2024/8/23 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于CompletableFuture并发任务编排实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 并發任務編排實現
    • 不帶返回值/參數傳遞任務
      • 串行執行
      • 并行執行
      • 并行執行-自定義線程池
      • 阻塞等待:多并行任務執行完再執行
      • 任意一個任務并發執行完就執行下個任務
      • 串并行任務依賴場景
    • 帶返回值/參數傳遞任務
      • 帶返回值實現
      • 串行執行
    • 多線程任務串行執行
      • 對任務并行執行,返回值combine
    • 寫在最后

并發任務編排實現

其實Java8中提供了并發編程框架CompletableFuture,以下結合不同場景進行使用。

不帶返回值/參數傳遞任務

模擬任務代碼:

DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");class TaskA implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(2000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務A", LocalDateTime.now().format(formatter)));}}class TaskB implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(1000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務B", LocalDateTime.now().format(formatter)));}}class TaskC implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(50);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務C", LocalDateTime.now().format(formatter)));}}

串行執行

A、B、C任務串行執行

CompletableFuture,runAsync():異步執行
thenRun():上個任務結束再執行(不帶上一個返回值結果)下一個任務
get():阻塞等待任務執行完成

實現方式:

@Testvoid thenRunTest() throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(new TaskA()).thenRun(new TaskB()).thenRun(new TaskC());future.get();}

輸出:

threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務A] time:[2021-06-01 22:56:51] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務B] time:[2021-06-01 22:56:52] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務C] time:[2021-06-01 22:56:53]

從日志就能看出串行執行就是通過單線程執行多個任務。

并行執行

A、B、C任務并行執行

CompletableFuture.allOf():等待所有的CompletableFuture執行完成,無返回值

代碼實現:

/*** 并發執行ABC任務*/@SneakyThrows@Testvoid SeqTest(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA());futures[1] = CompletableFuture.runAsync(new TaskB());futures[2] = CompletableFuture.runAsync(new TaskC());CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}

輸出:

start task [2021-06-01 23:03:49] threadName: [ForkJoinPool.commonPool-worker-3] taskName:[任務C] time:[2021-06-01 23:03:49] threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任務B] time:[2021-06-01 23:03:50] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務A] time:[2021-06-01 23:03:51] end task [2021-06-01 23:03:51]

上述這種方式執行可以看出CompletableFuture默認使用的是ForkJoinPool.commonPool線程池,居然用的默認線程池那線程數是如何配置的呢?后來找到源碼發現commonPool線程池配置代碼如下

  • 先去看看java環境變量有沒有制定線程數(如果沒有特殊制定默認沒有)
  • 如果沒有配置則通過操作系統的核心數減一來設置線程數(我理解的減一應該是為了給main thread執行)
  • 這種默認配置方式適合用于CPU密集型任務,如果IO型需要我們自己去配置線程池
  • 并行執行-自定義線程池

    不是所有任務都是CPU密集型,為了解決上述問題,尤其是IO場景,我們需要根據業務場景配置合理線程數充分使其利用cpu資源。
    如何合理配置線程數可以參考我之前文章

    @SneakyThrows@Testvoid ParTestWithThreadPool(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(24, 32, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);futures[2] = CompletableFuture.runAsync(new TaskC(), customThreadPool);CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}

    輸出:

    start task [2021-06-02 00:00:05] threadName: [pool-1-thread-3] taskName:[任務C] time:[2021-06-02 00:00:05] threadName: [pool-1-thread-2] taskName:[任務B] time:[2021-06-02 00:00:06] threadName: [pool-1-thread-1] taskName:[任務A] time:[2021-06-02 00:00:07] end task [2021-06-02 00:00:07]

    阻塞等待:多并行任務執行完再執行

    A、B并行都執行完后再執行C任務

    @AfterTestvoid after(){String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}@Testvoid SeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}

    輸出:

    start task [2021-06-02 16:56:42] threadName: [pool-1-thread-2] taskName:[任務B] time:[2021-06-02 16:56:43] threadName: [pool-1-thread-1] taskName:[任務A] time:[2021-06-02 16:56:44] threadName: [pool-1-thread-3] taskName:[任務C] time:[2021-06-02 16:56:44] end task [2021-06-02 16:56:44]

    從輸出中能看出B、A任務并發執行完成以后再執行C任務

    任意一個任務并發執行完就執行下個任務

    A、B并發執行,只要有一個執行完就執行C任務

    anyOf:只要有任意一個CompletableFuture結束,就可以做接下來的事情,而無須像AllOf那樣,等待所有的CompletableFuture結束

    @Testvoid anyOf() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.anyOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}

    輸出:

    start task [2021-06-02 17:43:42] threadName: [pool-1-thread-2] taskName:[任務B] time:[2021-06-02 17:43:43] threadName: [pool-1-thread-3] taskName:[任務C] time:[2021-06-02 17:43:43] ----------- end task [2021-06-02 17:43:43]

    串并行任務依賴場景

    @Testvoid multiSeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture.runAsync(new TaskA(), customThreadPool).get();CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskB(), customThreadPool).thenRun(new TaskC());futures[1] = CompletableFuture.runAsync(new TaskD(), customThreadPool).thenRun(new TaskE());CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskF(), customThreadPool).get();}

    輸出:

    start task [2021-06-02 17:33:35] threadName: [pool-1-thread-1] taskName:[任務A] time:[2021-06-02 17:33:37] ----------- threadName: [pool-1-thread-3] taskName:[任務D] time:[2021-06-02 17:33:37] threadName: [pool-1-thread-3] taskName:[任務E] time:[2021-06-02 17:33:37] ----------- threadName: [pool-1-thread-2] taskName:[任務B] time:[2021-06-02 17:33:38] threadName: [pool-1-thread-2] taskName:[任務C] time:[2021-06-02 17:33:38] ----------- threadName: [pool-1-thread-4] taskName:[任務F] time:[2021-06-02 17:33:38] end task [2021-06-02 17:33:38]

    帶返回值/參數傳遞任務

    模擬任務

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務A", LocalDateTime.now().format(formatter));return v;}String taskB(){try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務B", LocalDateTime.now().format(formatter));return v;}String taskC(){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務C", LocalDateTime.now().format(formatter));return v;}

    帶返回值實現

    supplyAsync():異步執行并帶返回值

    @Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> taskA());String result = stringCompletableFuture.get();System.out.println(result);}String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}

    串行執行

    thenApply(): 后面跟的是一個有參數、有返回值的方法,稱為Function。返回值是CompletableFuture類型。
    thenAccept():上個任務結束再執行(前面任務的結果作為下一個任務的入參)下一個任務

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務A", LocalDateTime.now().format(formatter));return v;}void taskC(String param){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務C", LocalDateTime.now().format(formatter));System.out.println(param + "\n ->" + v);}@Testvoid seqTest1() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> taskA()).thenApply(param -> {String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}).thenAccept(param -> taskC(param));completableFuture.get();}

    輸出:

    start task [2021-06-03 11:14:27] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務A] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務B] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務C] time:[2021-06-03 11:14:31] end task [2021-06-03 11:14:31]

    多線程任務串行執行

    A、B、C任務在多個線程環境下執行,但是執行需要帶要帶參數傳遞A->B->C,感覺這種使用場景比較少

    thenCompose():第1個參數是一個CompletableFuture類型,第2個參數是一個方法,并且是一個BiFunction,也就是該方法有2個輸入參數,1個返回值。從該接口的定義可以大致推測,它是要在2個 CompletableFuture 完成之后,把2個CompletableFuture的返回值傳進去,再額外做一些事情。

    模擬任務:

    String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務A", LocalDateTime.now().format(formatter));return v;}String taskB(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}String taskC2(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務C", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}

    實現一:

    @Testvoid multiCompletableFutureSeqTest() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCompose(firstTaskReturn -> CompletableFuture.supplyAsync(() -> taskB(firstTaskReturn))).thenCompose(secondTaskReturn -> CompletableFuture.supplyAsync(() -> taskC2(secondTaskReturn)));System.out.println(future.get());}

    輸出:

    start task [2021-06-03 15:04:45] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務A] time:[2021-06-03 15:04:48]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任務B] time:[2021-06-03 15:04:51]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任務C] time:[2021-06-03 15:04:54] end task [2021-06-03 15:04:54]

    對任務并行執行,返回值combine

    如果希望返回值是一個非嵌套的CompletableFuture,可以使用thenCompose

    @SneakyThrows@Testvoid multiCombineTest(){CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCombine(CompletableFuture.supplyAsync(() -> taskB2()), (s1, s2) -> s1 + "\n" + s2 + "\n" + "combine: " + Thread.currentThread().getName()).thenCombine(CompletableFuture.supplyAsync(() -> taskC2()), (s1, s2) -> s1 + "\n" + s2 + "\n" + "combine: " + Thread.currentThread().getName());System.out.println(future.get());}

    寫在最后

    推薦一個大佬的并發編程框架,文章思路是照著他的readme去寫的

    總結

    以上是生活随笔為你收集整理的基于CompletableFuture并发任务编排实现的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。