ForkJoin编程教学
一、概念
1什么是fork/join
fork/join 秉承分而治之思想,從JDK1.7開始,Java提供Fork/Join框架用于并行執行任務。它的思想就是講一個大任務分割成若干小任務,最終匯總每個小任務的結果得到這個大任務的結果。如下圖:
2.什么是分而治之
可以簡單的理解為:將規模為N的問題,當N<閾值,直接解決;當N>閾值,將N分解為K個小規模子問題,子問題互相對立,與原問題形式相同,將子問題的解合并得到原問題的解。
3.fork/join的工作竊取思想
假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。如果現場A的任務執行完成后,于是它就去其他線程B的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行,也就是線程B從頭部拿取,而線程A從尾部獲取任務開始執行。
工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并且消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。
?
?二、實現教程
1.涉及到的相關類
//有返回值? ?public abstract class RecursiveTask<V> extends ForkJoinTask<V>;
//沒有返回值 public abstract class RecursiveAction extends ForkJoinTask<Void>;
2.沒有返回值實現例子
package cn.hsa.iep.usc.emc.job;import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask;/*** @author lsj* @date 2022/5/7 14:09*/ public class NoRetureValueTest extends RecursiveAction {//每個線程執行的閾值private static final int THRESHOLD_NUM = 10000;//需要處理的數據從構造函數傳入List<Integer> sumList;public NoRetureValueTest(List<Integer> sumList) {this.sumList = sumList;}@Overrideprotected void compute() {if (sumList.size() > THRESHOLD_NUM) {System.out.println("大于閾值重新劃分sumList大小"+sumList.size());//大于閾值重新劃分List<Integer> leftList = sumList.subList(0, THRESHOLD_NUM);List<Integer> rightList = sumList.subList(THRESHOLD_NUM, sumList.size());NoRetureValueTest leftTask = new NoRetureValueTest(leftList);NoRetureValueTest rightTask = new NoRetureValueTest(rightList);//執行任務invokeAll(leftTask, rightTask);}//如果小于閾值直接執行任務taskExcute(sumList);}private Integer taskExcute(List<Integer> sumList) {int sum = 0;for (Integer num :sumList) {sum += num;}System.out.println(Thread.currentThread().getName()+"執行完畢,sum="+sum);return sum;}public static void main(String[] args) throws InterruptedException {// 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPoolForkJoinPool forkjoinPool = new ForkJoinPool();//任務數據List<Integer> sumList = new ArrayList<>();for (int i = 0 ;i<100000; i++){sumList.add(i);}//創建初始任務NoRetureValueTest retureValueTest = new NoRetureValueTest(sumList);forkjoinPool.execute(retureValueTest);//異步執行任務,不需要等待所有線程都執行完//關閉線程池forkjoinPool.shutdown();} }3.有返回值的例子?
package cn.hsa.iep.usc.emc.job;import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask;/*** @author lsj* @date 2022/5/7 14:09*/ public class RetureValueTest extends RecursiveTask<Integer> {//每個線程執行的閾值private static final int THRESHOLD_NUM = 10000;//需要處理的數據從構造函數傳入List<Integer> sumList;public RetureValueTest(List<Integer> sumList) {this.sumList = sumList;}@Overrideprotected Integer compute() {if (sumList.size() > THRESHOLD_NUM) {System.out.println("大于閾值重新劃分sumList大小"+sumList.size());//大于閾值重新劃分List<Integer> leftList = sumList.subList(0, THRESHOLD_NUM);List<Integer> rightList = sumList.subList(THRESHOLD_NUM, sumList.size());RetureValueTest leftTask = new RetureValueTest(leftList);RetureValueTest rightTask = new RetureValueTest(rightList);//執行任務invokeAll(leftTask, rightTask);//獲取結果Integer leftResult = leftTask.join();Integer rightResult = rightTask.join();return leftResult + rightResult;}//如果小于閾值直接執行任務return taskExcute(sumList);}private Integer taskExcute(List<Integer> sumList) {int sum = 0;for (Integer num :sumList) {sum += num;}try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(1/0);System.out.println(Thread.currentThread().getName()+"執行完畢,sum="+sum);return sum;}public static void main(String[] args) throws InterruptedException {// 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPoolForkJoinPool forkjoinPool = new ForkJoinPool();//任務數據List<Integer> sumList = new ArrayList<>();for (int i = 0 ;i<100000; i++){sumList.add(i);}//創建初始任務RetureValueTest retureValueTest = new RetureValueTest(sumList);Integer invoke = forkjoinPool.invoke(retureValueTest);//執行任務 // forkjoinPool.awaitTermination(10, TimeUnit.SECONDS);//阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束 // if(retureValueTest.isCompletedAbnormally()){ // System.out.println("異常:"+ retureValueTest.getException()); // }System.out.println("結果1:"+ retureValueTest.join());System.out.println("結果2:"+ invoke);//關閉線程池forkjoinPool.shutdown();} }三、備注
1 .在有大量計算任務時,此框架方法可進行并行計算效率高,以上示例,可以根據具體的業務需求更改屬性及相關方法用于匹配自己的業務邏輯
2 .JDK1.8后由于加入Stream流的操作,集合框架可以使用Collection<E> default Stream<E> parallelStream()的方法轉換成并行流進行計算,此時效果與Fork/Join任務同效
3 .ForkJoinPool中的多種方法
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);//等待獲取結果public void execute(ForkJoinTask<?> task);//異步執行public <T> T invoke(ForkJoinTask<T> task);//執行,獲取Future4.ForkJoinTask在執行的時候可能會拋出異常,但是沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,并且可以通過ForkJoinTask的getException方法獲取異常。getException方 ? ? ? ? ? ? 法返回Throwable對象,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有拋出異常則返回null。
if(task.isCompletedAbnormally()) {System.out.println(task.getException()); }?
總結
以上是生活随笔為你收集整理的ForkJoin编程教学的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 精彩!Facebook开源RAG,绕开重
- 下一篇: 对相同字母组合的单词进行归类