java8使用parallelStream并行流造成数据丢失或下标越界异常解决方案
描述
我們先看一段使用了并行流的代碼
@Test
public void testStream() {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
list.add(i);
}
System.out.println(list.size());
List<Integer> streamList = new ArrayList<>();
list.parallelStream().forEach(streamList::add);
System.out.println(streamList.size());
}
編譯結果:
觀察發現,原來集合中的數據有10000條,但是使用并行流遍歷數據插入到新集合streamList中后,新的集合中只有5746條數據。并且會在多次之后可能會出現數組下標越界異常,顯然這里的代碼是不合邏輯的。
分析
parallelStream中使用的是ForkJobTask。Fork/Join的框架是通過把一個大任務不斷fork成許多子任務,然后多線程執行這些子任務,最后再Join這些子任務得到最終結果。關于分支/合并框架的使用案例可以看我的這篇文章(用分支/合并框架執行并行求和)。從程序上看,就是先將list集合fork成多段,然后多線程添加到streamList的結合中,而streamList是ArrayList類型,它的add方法并不能保證原子性。
ArrayList的add方法源碼如下:
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
可以看到add方法可以概括為以下兩個步驟
ensureCapacityInternal(),確認下當前ArrayList中的數組,是否還可以加入新的元素。如果不行,就會再申請一個:int newCapacity = oldCapacity + (oldCapacity >> 1) 大小的數組(這個容量相當于:1 + 1/2 = 1.5倍),然后將數據copy過去。
elementData[size++] = e:添加元素到elementData數組中。
在并發情況下,如果同時有A、B兩個線程同時執行add,在第一步ensureCapacityInternal校驗數組容量時,A、B線程都發現當前容量還可以添加最有一個元素,不需擴容;因此進入第二步,此時,A線程先執行完,數組容量已滿,然后B線程再對elementData賦值時,就會拋出“ArrayIndexOutOfBoundsException”。
解決方案
第一種:將parallelStream改成stream,或者直接使用foreach處理。這可以通過判斷并發處理真實能帶來多大的好處,做取舍。
第二種:使用resultList =new CopyOnWriteArrayList<>();這是個線程安全的類。從源碼上看,CopyOnWriteArrayList在add操作時,通過ReentrantLock進行加鎖,防止并發寫。不給過CopyOnWriteArrayList,每次add操作都是把原數組中的元素拷貝一份到新數組中,然后在新數組中添加新元素,最后再把引用指向新數組。這會導致頻繁的對象創建,況且數組還是需要一塊連續的內存空間,如果有大量add操作,慎用。
第三種:使用包裝類resultList = Collections.synchronizedList(Arrays.asList());
總結
在從stream和parallelStream方法中進行選擇時,我們可以考慮以下幾個問題:
1.是否需要并行?
2.任務之間是否是獨立的?是否會引起任何競態條件?
3.結果是否取決于任務的調用順序?
對于問題1,在回答這個問題之前,你需要弄清楚你要解決的問題是什么,數據量有多大,計算的特點是什么?并不是所有的問題都適合使用并發程序來求解,比如當數據量不大時,順序執行往往比并行執行更快。畢竟,準備線程池和其它相關資源也是需要時間的。但是,當任務涉及到I/O操作并且任務之間不互相依賴時,那么并行化就是一個不錯的選擇。通常而言,將這類程序并行化之后,執行速度會提升好幾個等級。
對于問題2,如果任務之間是獨立的,并且代碼中不涉及到對同一個對象的某個狀態或者某個變量的更新操作,那么就表明代碼是可以被并行化的。
對于問題3,由于在并行環境中任務的執行順序是不確定的,因此對于依賴于順序的任務而言,并行化也許不能給出正確的結果。
總結
以上是生活随笔為你收集整理的java8使用parallelStream并行流造成数据丢失或下标越界异常解决方案的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 已经发车的票还能取出来吗_高铁票在车已经
- 下一篇: python将元祖写入txt文档_pyt