日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Parallel Stream 的错误实践

發(fā)布時間:2023/12/18 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Parallel Stream 的错误实践 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、前言

Java8 Stream 流的出現(xiàn),極大的簡化了業(yè)務(wù)需求中對集合數(shù)據(jù)的加工處理操作。雖然好用,但是一旦使用不當(dāng),也會帶來意想不到的結(jié)果,本文記錄使用 Parallel Stream 的錯誤實(shí)踐。

List<Object> sourceList = ...; List<Object> list = new ArrayList();sourceList.stream.map(...).foreach(list::add);

偽代碼如上所示,對 sourceList 進(jìn)行源數(shù)據(jù)加工,加工完畢后 add 進(jìn)結(jié)果 list 中。運(yùn)行過程中,發(fā)現(xiàn)其中存在 null 元素。

二、實(shí)驗(yàn)

寫一個簡單 Case 測試下,如下所示:

public class StreamTest {public static void main(String[] args) {List<Integer> list = new ArrayList<>();IntStream.range(0, 50).parallel().map(e -> e * 2).forEach(list::add);System.out.println("size = " + list.size() + "\n" + list);} }

多次執(zhí)行,發(fā)現(xiàn)結(jié)果集元素個數(shù)不等于期望元素個數(shù),且其中存在 null 元素,而且有幾率出現(xiàn)數(shù)組下標(biāo)越界錯誤。

size = 44 [30, 12, 32, 14, 34, 16, 42, 44, 46, 48, 24, 36, 20, 38, 40, null, 22, 6, 8, 10, 0, 2, 4, 56, 88, 82, 60, 84, 90, 92, 74, 94, 76, null, 50, 52, 98, 54, 62, 64, 66, 68, 70, 72] Exception in thread "main" java.lang.ArrayIndexOutOfBoundsExceptionat sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)at java.util.stream.IntPipeline.forEach(IntPipeline.java:404)at jit.wxs.disruptor.stream.StreamTest.main(StreamTest.java:15) Caused by: java.lang.ArrayIndexOutOfBoundsException: 15at java.util.ArrayList.add(ArrayList.java:463)at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205)at java.util.stream.IntPipeline$3$1.accept(IntPipeline.java:233)at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.execLocalTasks(ForkJoinPool.java:1040)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1058)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

三、分析

問題原因也很簡單,了解過 Parallel Stream 的同學(xué)知道,其內(nèi)部采用 ForkJoinPool 線程池進(jìn)行執(zhí)行,也就是說存在線程安全問題,而 ArrayList 是線程不安全的。下面依次來分析產(chǎn)生各種異常情況的原因。

3.1 元素?cái)?shù)量丟失

// java.util.ArrayList#add(E) public boolean add(E e) {ensureCapacityInternal(size + 1); // Increments modCount!!elementData[size++] = e;return true; }

導(dǎo)致數(shù)組下標(biāo)越界的原因是 ArrayList 的 add() 方法中的 elementData[size++] = e,這行代碼不是原子操作,可以拆解為:

  • 讀取 size 值
  • 將 e 添加到 size 的位置,即 elementData[size] = e
  • size++
  • 這里存在內(nèi)存可見性問題,當(dāng)線程 A 從內(nèi)存讀取 size 后,設(shè)置 e 值,將 size 加 1,然后寫入內(nèi)存。過程中可能有線程 B 也修改了 size 并寫入內(nèi)存,那么線程 A 寫入內(nèi)存的值就會丟失線程 B 的更新。這解釋了會出現(xiàn)數(shù)組長度比原始數(shù)組要小(元素丟失)的情況。

    3.2 null 元素

    null 元素產(chǎn)生跟元素?cái)?shù)據(jù)丟失類似,也是由于 elementData[size++] = e 不是原子操作導(dǎo)致的。假設(shè)存在三個線程,線程 1、線程 2、線程 3。三個線程同時開始執(zhí)行,初始 size 值為 1。

    • 線程 1 全部執(zhí)行完畢,此時 size 被更新為 2。

    • 線程 2 一開始讀取 size 值 = 1、將 e 添加到 size 位置后時間片就用完了,輪到執(zhí)行第三步 size++ 讀取到了線程 1 的更新,size 直接被更新成了 3。【注:此處線程 2 的 e 值也丟失了,被線程 1 覆蓋】

    • 線程3 一開始讀取 size 值 = 1 后時間片就用完了,輪到執(zhí)行第二步將 e 添加到 size 位置讀取到了線程 2 的更新,size 變成了 3。size = 2 的位置就被跳過了,因此 elementData[2] 為 null 了。

    3.3 數(shù)組下標(biāo)越界

    數(shù)組越界異常則主要發(fā)生在數(shù)組擴(kuò)容前的臨界點(diǎn)。假設(shè)當(dāng)前數(shù)組剛好只能添加一個元素,兩個線程同時準(zhǔn)備執(zhí)行ensureCapacityInternal(size + 1),同時讀取的 size 值,加 1 后進(jìn)入ensureCapacityInternal都不會導(dǎo)致擴(kuò)容。

    退出 ensureCapacityInternal 后,兩個線程同時執(zhí)行 elementData[size] = e,線程 B 的 size++ 先完成,假設(shè)此刻線程 A 讀取到了線程 B 的更新,線程 A 再執(zhí)行 size++,此時 size 的實(shí)際值就會大于數(shù)組的容量,這樣就會發(fā)生數(shù)組越界異常。

    四、解決

    解決問題也很簡單,分兩種,一種是把結(jié)果集合變成線程安全的即可。

    List<Integer> list = new CopyOnWriteArrayList<>(); // or List<Integer> list = Collections.synchronizedList(new ArrayList<>());

    第二種不使用 forEach 自己 add,改用 Stream 的 collect:

    public class StreamTest {public static void main(String[] args) {List<Integer> list = IntStream.range(0, 50).parallel().map(e -> e * 2).boxed().collect(Collectors.toList());System.out.println("size = " + list.size() + "\n" + list);} }

    五、參考資料

    • JAVA使用并行流(ParallelStream)時要注意的一些問題
    • 記一次java8 parallelStream使用不當(dāng)引發(fā)的血案

    總結(jié)

    以上是生活随笔為你收集整理的Parallel Stream 的错误实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。