Java并行任务框架Fork/Join
Fork/Join是什么?
Fork意思是分叉,Join為合并。Fork/Join是一個將任務分割并行運行,然后將最終結果合并成為大任務的結果的框架,父任務可以分割成若干個子任務,子任務可以繼續分割,提供我們一種方便的并行任務功能,滿足實際場景的業務需求,思想類似于MapReduce。任務的分割必須保證子任務獨立,不會相互依賴結果。
從哪里開始?
Fork/Join框架主要有如下接口和類:
- ForkJoinPool:一個線程池,用于執行調度分割的任務,實現了ExecutorService接口。提供三種執行任務的方式:
1、execute:最原生的執行方式,以異步執行,并且無返回結果。
2、submit:異步執行,有返回結果,返回結果是封裝后的Future對象。
3、invoke和invokeAll:異步執行,有返回結果,會等待所有任務執行執行完成,返回的結果為無封裝的泛型T。
- ForkJoinTask:抽象的分割任務,提供以分叉的方式執行,以及合并執行結果。
- RecursiveAction:異步任務,無返回結果。通常自定義的任務要繼承,并重寫compute方法,任務執行的就是compute方法。
- RecursiveTask:異步任務,有返回結果。通常自定義的任務要繼承,并重寫compute方法,任務執行的就是compute方法。
核心類圖
從核心類圖看出,要想開始一個分割的并行任務,可以創建一個ForkJoinPool線程池,同時創建無返回結果的任務RecursiveAction或有返回結果的任務RecursiveTask,最后調用線程池ForkJoinPool的execute或submit或invoke方法執行任務,完成后合并結果。
實例
我們以一個有返回結果的并行任務實例進行測試。計算從起始值到結束值得連續數的累加結果,利用Fork/Join框架。并對比普通計算和并行計算的耗時差異。
package com.misout.forkjoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* 計算從起始值到結束值得連續數的累加結果,利用Fork/Join框架
* @author Misout
* @date 2018-01-13 16:06:44
*/
public class SumTask extends RecursiveTask<Long> {
private static final long serialVersionUID = 4828818665955149519L;
/** 每個任務最多允許計算的數字個數閾值,超過這個閾值,任務進行拆分 */
private static final long THRESHOLD = 1000L;
/** 起始值 */
private Long startNumber;
/** 結束值 */
private Long endNumber;
public SumTask(Long startNumber, Long endNumber) {
this.startNumber = startNumber;
this.endNumber = endNumber;
}
/**
* 累加數的個數超過閾值1000個,拆分成2個子任務執行。子任務繼續作拆分。計算完,合并結果。
*/
@Override
protected Long compute() {
if(startNumber > endNumber) {
System.out.println("start number should be smaller than end number");
return 0L;
}
if(endNumber - startNumber < THRESHOLD) {
return this.getCount(startNumber, endNumber);
} else {
Long mid = (startNumber + endNumber) / 2;
RecursiveTask<Long> subTask1 = new SumTask(startNumber, mid);
RecursiveTask<Long> subTask2 = new SumTask(mid + 1, endNumber);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
/**
* 普通累加執行方法
* @param start 起始數
* @param end 結束數
* @return 累加和
*/
protected Long getCount(Long start, Long end) {
Long sum = 0L;
for(long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Long start = 5L;
Long end = 3463434L;
SumTask task = new SumTask(start, end);
Long startTime = System.currentTimeMillis();
Long sum = forkJoinPool.invoke(task);
Long endTime = System.currentTimeMillis();
System.out.println("fork/join : sum = " + sum + ", cost time = " + (endTime - startTime) + "ms");
startTime = System.currentTimeMillis();
Long sum2 = task.getCount(start, end);
endTime = System.currentTimeMillis();
System.out.println("normal : sum = " + sum2 + ", cost time = " + (endTime - startTime) + "ms");
}
}
說明:SumTask繼承RecursiveTask,并實現了compute方法。在compute方法中會進行任務分割,并繼續生成子任務,子任務仍然以分割的方式運行。
運行結果對比:
fork/join : sum = 5997689267885, cost time = 290ms
normal : sum = 5997689267885, cost time = 41ms
注意事項:任務拆分的深度最好不要太多,否則很容易因創建的線程過多影響系統性能。
work-stealing規則
在Java的API說明中提到,ForkJoinPool線程池與ThreadPoolExecutor線程池不同的地方在于,ForkJoinPool善于利用竊取工作執行加快任務的總體執行速度。實際上,在ForkJoinPool線程池中,若一個工作線程的任務隊列為空沒有任務執行時,便從其他工作線程中獲取任務主動執行。為了實現工作竊取,在工作線程中維護了雙端隊列,竊取任務線程從隊尾獲取任務,被竊取任務線程從隊頭獲取任務。這種機制充分利用線程進行并行計算,減少了線程競爭。但是當隊列中只存在一個任務了時,兩個線程去取反而會造成資源浪費。
總結
以上是生活随笔為你收集整理的Java并行任务框架Fork/Join的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: QT中的对象模型――QPointer
- 下一篇: Odoo event