日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

java用不用stream_Java parallelStream不使用预期的线程数

發(fā)布時間:2025/3/19 java 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java用不用stream_Java parallelStream不使用预期的线程数 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Java 8 parallelStream似乎使用的線程數(shù)多于系統(tǒng)屬性java.util.concurrent.ForkJoinPool.common.parallelism指定的線程數(shù).這些單元測試顯示我使用自己的ForkJoinPool使用所需數(shù)量的線程處理任務(wù),但是當(dāng)使用parallelStream時,線程數(shù)高于預(yù)期.

import org.junit.Test;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;

public class ParallelStreamTest {

private static final int TOTAL_TASKS = 1000;

@Test

public void testParallelStreamWithParallelism1() throws InterruptedException {

final Integer maxThreads = 1;

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", maxThreads.toString());

List objects = new ArrayList<>();

for (int i = 0; i < 1000; i++) {

objects.add(i);

}

final AtomicInteger concurrentThreads = new AtomicInteger(0);

final AtomicInteger taskCount = new AtomicInteger(0);

objects.parallelStream().forEach(i -> {

processTask(concurrentThreads, maxThreads); //expected to be called one at the time

taskCount.addAndGet(1);

});

assertTrue(taskCount.get() == TOTAL_TASKS);

}

@Test

public void testMyOwnForkJoinPoolWithParallelism1() throws InterruptedException {

final Integer threads = 1;

List objects = new ArrayList<>();

for (int i = 0; i < TOTAL_TASKS; i++) {

objects.add(i);

}

ForkJoinPool forkJoinPool = new ForkJoinPool(1);

final AtomicInteger concurrentThreads = new AtomicInteger(0);

final AtomicInteger taskCount = new AtomicInteger(0);

forkJoinPool.submit(() -> objects.parallelStream().forEach(i -> {

processTask(concurrentThreads, threads); //expected to be called one at the time

taskCount.addAndGet(1);

}));

forkJoinPool.shutdown();

forkJoinPool.awaitTermination(1, TimeUnit.MINUTES);

assertTrue(taskCount.get() == TOTAL_TASKS);

}

/**

* It simply processes a task increasing first the concurrentThreads count

*

* @param concurrentThreads Counter for threads processing tasks

* @param maxThreads Maximum number of threads that are expected to be used for processing tasks

*/

private void processTask(AtomicInteger concurrentThreads, int maxThreads) {

int currentConcurrentThreads = concurrentThreads.addAndGet(1);

if (currentConcurrentThreads > maxThreads) {

throw new IllegalStateException("There should be no more than " + maxThreads + " concurrent thread(s) but found " + currentConcurrentThreads);

}

// actual processing would go here

concurrentThreads.decrementAndGet();

}

}

應(yīng)該只有一個線程用于處理任務(wù),因為ForkJoinPool具有parallelism = 1和java.util.concurrent.ForkJoinPool.common.parallelism = 1.因此,兩個測試都應(yīng)該通過,但testParallelStreamWithParallelism1失敗:

java.lang.IllegalStateException: There should be no more than 1 concurrent thread(s) but found 2

似乎設(shè)置java.util.concurrent.ForkJoinPool.common.parallelism = 1沒有按預(yù)期工作,并且同時處理了多個并發(fā)任務(wù).

有任何想法嗎?

解決方法:

Fork / Join池的并行性設(shè)置確定了池工作線程的數(shù)量,但是從調(diào)用者線程開始,例如,主線程也將在作業(yè)上工作,使用公共池時總會有一個線程.這就是default setting of the common pool is “number of cores minus one”讓實際工作線程數(shù)等于核心數(shù)的原因.

使用自定義Fork / Join池,流操作的調(diào)用者線程已經(jīng)是池的工作線程,因此,利用它來處理作業(yè)不會增加實際工作線程數(shù).

必須強調(diào)的是,Stream實現(xiàn)和Fork / Join池之間的交互完全沒有指定,因為流使用Fork / Join框架的事實是一個實現(xiàn)細節(jié).無法保證更改默認池的屬性對流有任何影響,也不保證在自定義Fork / Join池的任務(wù)中調(diào)用流操作將使用該自定義池.

標簽:java,java-8,java-stream,multithreading

來源: https://codeday.me/bug/20190828/1753500.html

總結(jié)

以上是生活随笔為你收集整理的java用不用stream_Java parallelStream不使用预期的线程数的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。