日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Java并行任务框架Fork/Join

發布時間:2023/10/11 综合教程 94 老码农
生活随笔 收集整理的這篇文章主要介紹了 Java并行任务框架Fork/Join 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Fork/Join是什么?

Fork意思是分叉,Join為合并。Fork/Join是一個將任務分割并行運行,然后將最終結果合并成為大任務的結果的框架,父任務可以分割成若干個子任務,子任務可以繼續分割,提供我們一種方便的并行任務功能,滿足實際場景的業務需求,思想類似于MapReduce。任務的分割必須保證子任務獨立,不會相互依賴結果。

 
 

從哪里開始?

Fork/Join框架主要有如下接口和類:

  • ForkJoinPool:一個線程池,用于執行調度分割的任務,實現了ExecutorService接口。提供三種執行任務的方式:

1、execute:最原生的執行方式,以異步執行,并且無返回結果。

2、submit:異步執行,有返回結果,返回結果是封裝后的Future對象。

3、invoke和invokeAll:異步執行,有返回結果,會等待所有任務執行執行完成,返回的結果為無封裝的泛型T。

  • ForkJoinTask:抽象的分割任務,提供以分叉的方式執行,以及合并執行結果。
  • RecursiveAction:異步任務,無返回結果。通常自定義的任務要繼承,并重寫compute方法,任務執行的就是compute方法。
  • RecursiveTask:異步任務,有返回結果。通常自定義的任務要繼承,并重寫compute方法,任務執行的就是compute方法。

核心類圖

 
 

從核心類圖看出,要想開始一個分割的并行任務,可以創建一個ForkJoinPool線程池,同時創建無返回結果的任務RecursiveAction或有返回結果的任務RecursiveTask,最后調用線程池ForkJoinPool的execute或submit或invoke方法執行任務,完成后合并結果。

實例

我們以一個有返回結果的并行任務實例進行測試。計算從起始值到結束值得連續數的累加結果,利用Fork/Join框架。并對比普通計算和并行計算的耗時差異。

package com.misout.forkjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask; /**
* 計算從起始值到結束值得連續數的累加結果,利用Fork/Join框架
* @author Misout
* @date 2018-01-13 16:06:44
*/
public class SumTask extends RecursiveTask<Long> { private static final long serialVersionUID = 4828818665955149519L; /** 每個任務最多允許計算的數字個數閾值,超過這個閾值,任務進行拆分 */
private static final long THRESHOLD = 1000L; /** 起始值 */
private Long startNumber; /** 結束值 */
private Long endNumber; public SumTask(Long startNumber, Long endNumber) {
this.startNumber = startNumber;
this.endNumber = endNumber;
} /**
* 累加數的個數超過閾值1000個,拆分成2個子任務執行。子任務繼續作拆分。計算完,合并結果。
*/
@Override
protected Long compute() {
if(startNumber > endNumber) {
System.out.println("start number should be smaller than end number");
return 0L;
}
if(endNumber - startNumber < THRESHOLD) {
return this.getCount(startNumber, endNumber);
} else {
Long mid = (startNumber + endNumber) / 2;
RecursiveTask<Long> subTask1 = new SumTask(startNumber, mid);
RecursiveTask<Long> subTask2 = new SumTask(mid + 1, endNumber);
subTask1.fork();
subTask2.fork(); return subTask1.join() + subTask2.join();
}
} /**
* 普通累加執行方法
* @param start 起始數
* @param end 結束數
* @return 累加和
*/
protected Long getCount(Long start, Long end) {
Long sum = 0L;
for(long i = start; i <= end; i++) {
sum += i;
} return sum;
} public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Long start = 5L;
Long end = 3463434L;
SumTask task = new SumTask(start, end); Long startTime = System.currentTimeMillis();
Long sum = forkJoinPool.invoke(task);
Long endTime = System.currentTimeMillis();
System.out.println("fork/join : sum = " + sum + ", cost time = " + (endTime - startTime) + "ms"); startTime = System.currentTimeMillis();
Long sum2 = task.getCount(start, end);
endTime = System.currentTimeMillis();
System.out.println("normal : sum = " + sum2 + ", cost time = " + (endTime - startTime) + "ms");
}
}

說明:SumTask繼承RecursiveTask,并實現了compute方法。在compute方法中會進行任務分割,并繼續生成子任務,子任務仍然以分割的方式運行。

運行結果對比:

fork/join : sum = 5997689267885, cost time = 290ms
normal : sum = 5997689267885, cost time = 41ms

注意事項:任務拆分的深度最好不要太多,否則很容易因創建的線程過多影響系統性能。

work-stealing規則

在Java的API說明中提到,ForkJoinPool線程池與ThreadPoolExecutor線程池不同的地方在于,ForkJoinPool善于利用竊取工作執行加快任務的總體執行速度。實際上,在ForkJoinPool線程池中,若一個工作線程的任務隊列為空沒有任務執行時,便從其他工作線程中獲取任務主動執行。為了實現工作竊取,在工作線程中維護了雙端隊列,竊取任務線程從隊尾獲取任務,被竊取任務線程從隊頭獲取任務。這種機制充分利用線程進行并行計算,減少了線程競爭。但是當隊列中只存在一個任務了時,兩個線程去取反而會造成資源浪費。

 
                           ForkJoinPool工作竊取圖
 

總結

以上是生活随笔為你收集整理的Java并行任务框架Fork/Join的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。