Tofig Hasanov
Tofig Hasanov

Reputation: 3709

Java parallelstream not using optimal number of threads when using newCachedThreadPool()

I have made two separate implementations of parallel reads from database. First implementation is using ExecutorService with newCachedThreadPool() constructor and Futures: I simply make a call that returns a future for each read case and then after I make all the calls I call get() on them. This implementation works OK and is fast enough.

The second implementation is using parallel streams. When I put parallel stream call into the same ExecutorService pool it works almost 5 times slower and it seems that it is not using as many threads as I would hope. When I instead put it into ForkJoinPool pool = new ForkJoinPool(50) then it works as fast as the previous implementation.

My question is:

Why do parallel streams under-utilize threads in newCachedThreadPool version?

Here is the code for the second implementation (I am not posting the first implementation, cause that one works OK anyway):

private static final ExecutorService pool = Executors.newCachedThreadPool();

final List<AbstractMap.SimpleImmutableEntry<String, String>> simpleImmutableEntryStream =
                personIdList.stream().flatMap(
                        personId -> movieIdList.stream().map(
                                movieId -> new AbstractMap.SimpleImmutableEntry<>(personId, movieId))).collect(Collectors.toList());

final Future<Map<String, List<Summary>>> futureMovieSummaryForPerson = pool.submit(() -> {
      final Stream<Summary> summaryStream = simpleImmutableEntryStream.parallelStream().map(
            inputPair -> {
                   return FeedbackDao.find(inputPair.getKey(), inputPair.getValue());
            }).filter(Objects::nonNull);
return summaryStream.collect(Collectors.groupingBy(Summary::getPersonId));
});

Upvotes: 2

Views: 293

Answers (1)

Nicolas Filotto
Nicolas Filotto

Reputation: 44985

This is related to how ForkJoinTask.fork is implemented, if the current thread comes from a ForkJoinPool it will use the same pool to submit the new tasks but if not it will use the common pool with the total amount of processors in your local machine and here when you create your pool with Executors.newCachedThreadPool(), the thread created by this pool is not recognized as coming from a ForkJoinPool such that it uses the common pool.

Here is how it is implemented, it should help you to better understand:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

The thread created by the pool Executors.newCachedThreadPool() will not be of type ForkJoinWorkerThread such that it will use the common pool with an under optimized pool size to submit the new tasks.

Upvotes: 2

Related Questions