【java8】并行流Stream
流在處理數據進行一些迭代操作的時候確認很方便,但是在執行一些耗時或是占用資源很高的任務時候,串行化的流無法帶來速度/性能上的提升,并不能滿足我們的需要。
通常我們會使用多線程來并行或是分片分解執行任務,而在Stream中也提供了這樣的并行方法,下面將會一一介紹這些方法。
將順序流轉為并行流
使用parallelStream()方法或者是使用stream().parallel()來轉化為并行流。
但是只是可能會返回一個并行的流,流是否能并行執行還受到其他一些條件的約束(如是否有序,是否支持并行)。
對順序流調用parallel方法并不意味著流本身有任何實際的變化。它在內部實際上就是設了一個boolean標志,表示你想讓調用parallel之后進行的所有操作都并行執行。類似地,你只需要對并行流調用sequential方法就可以把它變成順序流。如果對這個方法調用了多次,將以最后一次執行為準。
package com.morris.java8.parallel;import java.util.concurrent.TimeUnit; import java.util.stream.IntStream;public class ParallerDemo {public static void main(String[] args) {IntStream list = IntStream.range(0, 6);//開始并行執行list.parallel().forEach(i -> {Thread thread = Thread.currentThread();System.err.println("integer:" + i + "," + "currentThread:" + thread.getName());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}});} }運行結果如下:
integer:3,currentThread:main integer:4,currentThread:ForkJoinPool.commonPool-worker-3 integer:5,currentThread:ForkJoinPool.commonPool-worker-2 integer:1,currentThread:ForkJoinPool.commonPool-worker-1 integer:2,currentThread:ForkJoinPool.commonPool-worker-1 integer:0,currentThread:ForkJoinPool.commonPool-worker-3從運行結果里面我們可以很清楚的看到parallelStream同時使用了主線程和ForkJoinPool.commonPool創建的線程。 值得說明的是這個運行結果并不是唯一的,實際運行的時候可能會得到多個結果。
看看流的parallel方法,你可能會想,并行流用的線程是從哪兒來的?有多少個?怎么自定義這個過程呢?
并行流內部使用了默認的ForkJoinPool,它默認的線程數量就是你的處理器數量,這個值是由Runtime.getRuntime().availableProcessors()得到的。
但是你可以通過系統屬性java.util.concurrent.ForkJoinPool.common.parallelism來改變線程池大小,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");這是一個全局設置,因此它將影響代碼中所有的并行流。反過來說,目前還無法專為某個并行流指定這個值。一般而言,讓ForkJoinPool的大小等于處理器數量是個不錯的默認值,除非你有很好的理由,否則我們強烈建議你不要修改它。
// 設置全局并行流并發線程數 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12"); System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 12 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 12為什么兩次的運行結果是一樣的呢?上面剛剛說過了這是一個全局設置,java.util.concurrent.ForkJoinPool.common.parallelism是final類型的,整個JVM中只允許設置一次。既然默認的并發線程數不能反復修改,那怎么進行不同線程數量的并發測試呢?答案是:引入ForkJoinPool。
IntStream range = IntStream.range(1, 100000); // 傳入parallelism new ForkJoinPool(parallelism).submit(() -> range.parallel().forEach(System.out::println)).get();因此,使用parallelStream時需要注意的一點是,多個parallelStream之間默認使用的是同一個線程池,所以IO操作盡量不要放進parallelStream中,否則會阻塞其他parallelStream。
// 獲取當前機器CPU處理器的數量 System.out.println(Runtime.getRuntime().availableProcessors());// 輸出 4 // parallelStream默認的并發線程數 System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 3為什么parallelStream默認的并發線程數要比CPU處理器的數量少1個?因為最優的策略是每個CPU處理器分配一個線程,然而主線程也算一個線程,所以要占一個名額。 這一點可以從源碼中看出來:
static final int MAX_CAP = 0x7fff; // max #workers - 1 // 無參構造函數 public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false); }測試流的性能
下面通過幾種方式計算數據的和來測試流的性能。
package com.morris.java8.parallel;import java.util.function.Function; import java.util.stream.LongStream; import java.util.stream.Stream;public class ParallerStreamExample {public static void main(String[] args) {long n = 100_000_000;System.out.println("normal:" + recordTime(ParallerStreamExample::normal, n) + " MS");System.out.println("iterator:" + recordTime(ParallerStreamExample::iterator, n) + " MS");// 太耗時,暫時注釋// System.out.println("iteratorParallel:" + recordTime(ParallerStreamExample::iteratorParallel, n) + " MS");System.out.println("longStream:" + recordTime(ParallerStreamExample::longStream, n) + " MS");System.out.println("longStreamParallel:" + recordTime(ParallerStreamExample::longStreamParallel, n) + " MS");}public static long recordTime(Function<Long, Long> function, long n) {long lowestCostTime = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long startTime = System.currentTimeMillis();function.apply(n);long costTime = System.currentTimeMillis() - startTime;if(costTime < lowestCostTime) {lowestCostTime = costTime;}}return lowestCostTime;}/*** 正常for循環* @param n* @return*/public static long normal(long n) {long result = 0;for(long i = 1; i <= n; i++) {result += i;}return result;}/*** iterate順序流* @param n* @return*/public static long iterator(long n) {return Stream.iterate(1L, t -> t + 1).limit(n).reduce(0L, Long::sum);}/*** iterate并行流* @param n* @return*/public static long iteratorParallel(long n) {return Stream.iterate(1L, t -> t + 1).parallel().limit(n).reduce(0L, Long::sum);}/*** rangeClosed順序流* @param n* @return*/public static long longStream(long n) {return LongStream.rangeClosed(1, n).sum();}/*** rangeClosed并行流* @param n* @return*/public static long longStreamParallel(long n) {return LongStream.rangeClosed(1, n).parallel().sum();} }運行結果如下:
normal:33 MS iterator:990 MS longStream:44 MS longStreamParallel:16 MS結論:
-
Stream串行性能明顯差于for循環迭代,因為Stream串行還有流水線成本在里面。
-
并行的Stream API能夠發揮多核特性,但是有時候不如串行流(比如后面的計算依賴前面的計算結果就不適宜用并行流)
高效使用并行流
下面是一些使用并行流需要思考的方面:
-
留意裝箱。自動裝箱和拆箱操作會大大降低性能。Java 8中有原始類型流(IntStream、LongStream、DoubleStream)來避免這種操作,但凡有可能都應該用這些流。
-
有些操作本身在并行流上的性能就比順序流差,比如后面的計算依賴前面的計算結果。
-
還要考慮流的操作流水線的總計算成本。設N是要處理的元素的總數,Q是一個元素通過流水線的大致處理成本,則N*Q就是這個對成本的一個粗略的定性估計。Q值較高就意味著使用并行流時性能好的可能性比較大。
-
對于較小的數據量,選擇并行流幾乎從來都不是一個好的決定。并行處理少數幾個元素的好處還抵不上并行化造成的額外開銷。
-
要考慮流背后的數據結構是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因為前者用不著遍歷就可以平均拆分,而后者則必須遍歷。
-
流自身的特點,以及流水線中的中間操作修改流的方式,都可能會改變分解過程的性能。例如,一個SIZED流可以分成大小相等的兩部分,這樣每個部分都可以比較高效地并行處理,但篩選操作可能丟棄的元素個數卻無法預測,導致流本身的大小未知。
-
還要考慮終端操作中合并步驟的代價是大是小(例如Collector中的combiner方法)。如果這一步代價很大,那么組合每個子流產生的部分結果所付出的代價就可能會超出通過并行流得到的性能提升。
總結
以上是生活随笔為你收集整理的【java8】并行流Stream的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: html隐藏visibility,CSS
- 下一篇: 讲真,这两款idea插件,能治愈你英语不