日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

7. Java8新特性-并行数据处理(parallel)

發(fā)布時間:2023/12/18 java 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 7. Java8新特性-并行数据处理(parallel) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在JDK7之前,并行處理數(shù)據(jù)集合非常麻煩。首先需要自己明確的把包含數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)分成若干個子部分,第二需要給每個子部分分配一個獨立的線程;第三需要在恰當?shù)臅r候?qū)λ鼈冞M行同步來避免不希望出現(xiàn)的競爭條件,等待所有線程完成,最后把這些部分合并起來。

Doug Lea 在JDK7中引入了fork/join框架,讓這些操作更穩(wěn)定,更不易出錯。

本節(jié)主要內(nèi)容:
1. 用并行流并行處理數(shù)據(jù)
2. 并行流的性能分析
3. fork/join框架
4. 使用Spliterator分割流

學完本節(jié)期望能達到:
1. 熟練使用并行流,來加速業(yè)務(wù)性能
2. 了解流內(nèi)部的工作原理,以防止誤用的情況
3. 通過Spliterator控制數(shù)據(jù)塊的劃分方式

并行流

可以通過對數(shù)據(jù)源調(diào)用parallelStream方法來將源轉(zhuǎn)換為并行流。并行流就是一個把內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每個數(shù)據(jù)塊的流。這樣可以自動將工作負荷轉(zhuǎn)到多核中并行處理。

考慮下面一個實現(xiàn):給定正整數(shù)n,計算 1 + 2 + … n的和。
使用stream的實現(xiàn):

private static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum); }

將上面的順序流轉(zhuǎn)換為并行流,實現(xiàn)如下:

private static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum); }

即通過調(diào)用方法parallel可將順序流轉(zhuǎn)換為并行流。

但需要注意的是流僅在終端操作時才開始執(zhí)行,所以當前流是順序流還是并行流以最靠近終端操作的流類型為準,示例:

list.stream().parallel().filter(e -> e.age > 20).sequential().map(...).parallel().collect(...);

此種情況并不會按預(yù)想的先使用并行流執(zhí)行過濾,再按順序流執(zhí)行映射轉(zhuǎn)換。而是整個流水線操作都按并行流執(zhí)行。

配置并行流使用的線程池

并行流內(nèi)部使用了默認的ForkJoinPool, 它默認的線程數(shù)量就是處理器的數(shù)量(Runtime.getRuntime().availableProcessors())。也可以通過設(shè)置系統(tǒng)屬性來改變它(System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”))。但它是一個全局設(shè)置,會影響所有的并行流,一般而言線程數(shù)等于處理器數(shù)量是一個合理的數(shù)值,不需要修改。

測試流性能

一般而言,同一個功能給我們的感覺是并行流性能會比順序流性能更好。然而在軟件工程中,優(yōu)化性能的黃金準則是:測量。我們開發(fā)了程序,用來測量4種寫法的累加,看看性能如何:

@Slf4j public class SumSample {/*** 順序流、并行流性能測試* 實現(xiàn)1~1億整型數(shù)字累加**/public static void main(String[] args) {CostUtil.cost(() -> log.info("==> for: 1 + ... + 100_000_000, result: {}", forSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> sequential: 1 + ... + 100_000_000, result: {}", sequentialSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> parallel: 1 + ... + 100_000_000, result: {}", parallelSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> longParallel: 1 + ... + 100_000_000, result: {}", longParallelSum(100_000_000)));}/*** 內(nèi)部迭代方式實現(xiàn)累加*/private static long forSum(long n) {long result = 0;for (int i = 1; i <= n; i ++) {result += i;}return result;}/*** 順序流實現(xiàn)累加*/private static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}/*** 并行流實現(xiàn)累加*/private static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}/*** long原生流范圍實現(xiàn)累加*/private static long longParallelSum(long n) {return LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum);} } // result: 2022-01-18 10:53:59.035 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> for: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 58 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> sequential: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 1420 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.627 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> parallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 4167 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> longParallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 60

使用四種方法實現(xiàn)1~1億個數(shù)的累加,這是在i7 2.4GHz 6core/12threads CPU的執(zhí)行結(jié)果。讓人很意外,并非是并行流性能最好,反而是最差的,最樸實的for循環(huán)單線程性能最佳。

原因:

  • iterate生成的是裝箱對象,必須拆箱成數(shù)字才能求和。 這一點好理解因為iterate生成的流元素是Long類型,進行累加計算下一個流元素需要先拆箱,計算完再裝箱。
  • iterate很難分成多個獨立的塊來并行執(zhí)行。原因是應(yīng)用這個函數(shù)都要依賴前一次應(yīng)用的結(jié)果,即本質(zhì)上iterate需要順序執(zhí)行。雖然標記了流是并行流,但并不意味著一定能并行執(zhí)行,反而增加了額外開銷,影響了性能。
  • 通過上面的比較需要意識到:并行編程比較復(fù)雜,有時候甚至違反直覺。如果用的不對(如本例,采用了一個不易并行化的操作iterate),甚至會讓性能更差。所以了解parallel方法背后的執(zhí)行細節(jié)非常必要。

    LongStream.rangeClosed 代替 iterate

    僅高效求和的示例,可用LongStream.rangeClosed高效替代iterate實現(xiàn)并行計算。它的優(yōu)點是:

  • LongStream.rangeClosed直接產(chǎn)生原始類型的long數(shù)字,沒有裝箱拆箱的開銷
  • LongStream.rangeClosed會生成數(shù)字范圍,很容易拆分為獨立的小塊。
  • 通過示例演示它的并行執(zhí)行性能比同樣是并行流的iterate版本要快了70倍。可見它有效利用了并行。

    為什么并行流還是比for慢?

    上面的執(zhí)行結(jié)果可以看出LongStream.rangeClosed的性能還是比for略慢一點,原因是:

    并行化是有代價的,并行過程中需要對流做遞歸劃分,把流的歸納操作分配到不同的線程,最后合并。且多個核心之間移動數(shù)據(jù)的代價也很大。

    正確使用并行流

    使用并行流加速性能需要確保用對,如果計算結(jié)果是錯誤的,再快也沒意義。
    誤用并行流而產(chǎn)生錯誤的首要原因是使用的算法改變了某些共享狀態(tài)。 如下面示例:

    class Accumulator {public long total = 0;public void add(long value) { total += value; } }public static long sideEffectSum(long n) {Accumulator accumulator = new Accumulator();LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);return accumulator.total;}//result: 2022-01-18 11:40:16.943 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> sideEffectSum: 1 + ... + 100_000_000, result: 1037016191509285 2022-01-18 11:40:16.944 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 40

    從上面示例看出雖然很快,但結(jié)果是錯誤的。 原因是total += value非原子操作,出現(xiàn)了競態(tài)條件。如果使用同步來修復(fù),就失去了并行的意義。 所以寫并行流時一定要考慮多個線程是否會修改共享對象的可變狀態(tài)。

    高效使用并行流

    一些高效使用并行流的建議:

  • 如果有疑問,進行測試。并行流并不總是更快,且有時候跟直覺不一致。 使用適當?shù)幕鶞蔬M行測試來檢查其性能。
  • 留意自動拆裝箱。 頻繁的自動拆裝箱非常損耗性能。此種情況時盡量使用原始數(shù)據(jù)流來應(yīng)對: IntStream, LongStream, DoubleStream。
  • 有些操作天生并行流的性能就比順序流差,如依賴元素順序的操作:limit(), findFirst()等。
  • 需要考慮流操作流水線的總計算成本。 設(shè)N為元素的總數(shù),Q是一個元素通過流水線的大致處理成本,則N * Q 是對總成本的粗略估計。 Q值越高意味著使用并行流時的性能更好的可能性更大。 (使用for循環(huán)計算1 … N比并行流塊原因就是Q太小,雖然N已經(jīng)夠大了)
  • 對于較小的數(shù)據(jù)量,選擇并行流幾乎從來都不是最優(yōu)的。因為并行本身開銷就大,如果元素不多無法覆蓋并行本身的開銷。
  • 需要考慮背后的數(shù)據(jù)結(jié)構(gòu)是否易于分解。如用range工廠方法創(chuàng)建的原始流可以快速分解。 后面可以自定義Spliterator來完全掌控分解過程。
  • 還要考慮終端操作中合并步驟的代價大小,如果這一步代價很大,那么組合每個子流產(chǎn)生的部分結(jié)果所付出的代價會超過通過并行得到的性能提升。
  • 一些常見的數(shù)據(jù)源的可分解性匯總:

    Fork/Join框架

    想要正確的使用并行流,了解它背后的實現(xiàn)原理至關(guān)重要。 并行流背后就是采用的Fork/Join框架。
    // TODO: 待補充

    Spliterator

    // TODO: 待補充

    小結(jié)

  • 內(nèi)部迭代讓你可以并行處理一個流,而無需在代碼中顯式使用和協(xié)調(diào)不同的線程。
  • 雖然并行處理一個流很容易,卻不能保證程序在所有情況下都運行得更快。并行軟件的
    行為和性能有時是違反直覺的,因此一定要測量,確保你并沒有把程序拖得更慢。
  • 像并行流那樣對一個數(shù)據(jù)集并行執(zhí)行操作可以提升性能,特別是要處理的元素數(shù)量龐大,
    或處理單個元素特別耗時的時候。
  • 從性能角度來看,使用正確的數(shù)據(jù)結(jié)構(gòu),如盡可能利用原始流而不是一般化的流,幾乎
    總是比嘗試并行化某些操作更為重要。
  • 分支/合并框架讓你得以用遞歸方式將可以并行的任務(wù)拆分成更小的任務(wù),在不同的線程
    上執(zhí)行,然后將各個子任務(wù)的結(jié)果合并起來生成整體結(jié)果。
  • Spliterator定義了并行流如何拆分它要遍歷的數(shù)據(jù)。
  • 總結(jié)

    以上是生活随笔為你收集整理的7. Java8新特性-并行数据处理(parallel)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。