Reputation: 183
Java 8 parallelStream seems to use more threads than the ones specified by the system property java.util.concurrent.ForkJoinPool.common.parallelism. These unit tests show that I process tasks using the desired number of threads using my own ForkJoinPool but when using parallelStream the number of threads is higher than expected.
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class ParallelStreamTest {
private static final int TOTAL_TASKS = 1000;
@Test
public void testParallelStreamWithParallelism1() throws InterruptedException {
final Integer maxThreads = 1;
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", maxThreads.toString());
List<Integer> objects = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
objects.add(i);
}
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);
objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, maxThreads); //expected to be called one at the time
taskCount.addAndGet(1);
});
assertTrue(taskCount.get() == TOTAL_TASKS);
}
@Test
public void testMyOwnForkJoinPoolWithParallelism1() throws InterruptedException {
final Integer threads = 1;
List<Integer> objects = new ArrayList<>();
for (int i = 0; i < TOTAL_TASKS; i++) {
objects.add(i);
}
ForkJoinPool forkJoinPool = new ForkJoinPool(1);
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);
forkJoinPool.submit(() -> objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, threads); //expected to be called one at the time
taskCount.addAndGet(1);
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(taskCount.get() == TOTAL_TASKS);
}
/**
* It simply processes a task increasing first the concurrentThreads count
*
* @param concurrentThreads Counter for threads processing tasks
* @param maxThreads Maximum number of threads that are expected to be used for processing tasks
*/
private void processTask(AtomicInteger concurrentThreads, int maxThreads) {
int currentConcurrentThreads = concurrentThreads.addAndGet(1);
if (currentConcurrentThreads > maxThreads) {
throw new IllegalStateException("There should be no more than " + maxThreads + " concurrent thread(s) but found " + currentConcurrentThreads);
}
// actual processing would go here
concurrentThreads.decrementAndGet();
}
}
There should be only one thread used for processing tasks as the ForkJoinPool has parallelism=1
and java.util.concurrent.ForkJoinPool.common.parallelism=1
. Therefore both tests should pass but testParallelStreamWithParallelism1 fails with:
java.lang.IllegalStateException: There should be no more than 1 concurrent thread(s) but found 2
It seems that setting java.util.concurrent.ForkJoinPool.common.parallelism=1 is not working as expected and more than 1 concurrent task is processed simultaneously.
Any ideas?
Upvotes: 4
Views: 8496
Reputation: 2228
Run this example:
IntStream.rangeClosed(0,9).parallel().forEach((i) -> {
try {
System.out.println("id - " + Thread.currentThread().getName());
} catch (Exception e) {
}
});
When you used parameter java.util.concurrent.ForkJoinPool.common.parallelism=1 you will see something like
id - main
id - main
id - ForkJoinPool.commonPool-worker-1
id - main
id - ForkJoinPool.commonPool-worker-1
id - main
id - ForkJoinPool.commonPool-worker-1
id - main
id - ForkJoinPool.commonPool-worker-1
id - ForkJoinPool.commonPool-worker-1
As you now know streams use common ForkJoinPool(with paralelism=1) and in addition they use current thread also.
Upvotes: 1
Reputation: 298263
The parallelism setting of the Fork/Join pool determines the number of pool worker threads, but since the caller thread, e.g. the main thread, will work on the jobs too, there is always one more thread when using the common pool. That’s why the default setting of the common pool is “number of cores minus one” to get an actual number of working threads equal to the number of cores.
With your custom Fork/Join pool, the caller thread of the stream operation is already a worker thread of the pool, hence, utilizing it for processing jobs doesn’t increase the actual number of working threads.
It must be emphasized that the interaction between the Stream implementation and the Fork/Join pool is entirely unspecified as the fact that streams use the Fork/Join framework under the hood is an implementation detail. There is no guaranty that changing the default pool’s properties has any effect on streams nor that calling stream operations from within a custom Fork/Join pool’s task will use that custom pool.
Upvotes: 5
Reputation: 2605
Set this parameter as well:
System.setProperty("java.util.concurrent.ForkJoinPool.common.maximumSpares", "0");
This worked for me. Apparently (although not very well documented), there are allowed 'Spare' threads to pick up work from default ForkJoinPool.
Upvotes: 3
Reputation: 3433
You deleted the correct answer from your first posting of this question, so I'll expound and expand on it. Your problem is here:
int currentConcurrentThreads = concurrentThreads.addAndGet(1);
and here:
objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, maxThreads); //expected to be called one at the time
taskCount.addAndGet(1);
});
Each thread in the parallel stream invokes processTask
. Each therefore increments concurrentThreads
(but for some reason not with
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicInteger.html#incrementAndGet--
). Since each runs in parallel, they're all incrementing concurrentThreads
before any can decrement it. So of course you exceed the number of threads you expect.
Upvotes: 0