Reputation: 3709
I have made two separate implementations of parallel reads from database.
First implementation is using ExecutorService
with newCachedThreadPool()
constructor and Futures: I simply make a call that returns a future for each read case and then after I make all the calls I call get()
on them. This implementation works OK and is fast enough.
The second implementation is using parallel streams. When I put parallel stream call into the same ExecutorService
pool it works almost 5 times slower and it seems that it is not using as many threads as I would hope. When I instead put it into ForkJoinPool pool = new ForkJoinPool(50)
then it works as fast as the previous implementation.
My question is:
Why do parallel streams under-utilize threads in newCachedThreadPool
version?
Here is the code for the second implementation (I am not posting the first implementation, cause that one works OK anyway):
private static final ExecutorService pool = Executors.newCachedThreadPool();
final List<AbstractMap.SimpleImmutableEntry<String, String>> simpleImmutableEntryStream =
personIdList.stream().flatMap(
personId -> movieIdList.stream().map(
movieId -> new AbstractMap.SimpleImmutableEntry<>(personId, movieId))).collect(Collectors.toList());
final Future<Map<String, List<Summary>>> futureMovieSummaryForPerson = pool.submit(() -> {
final Stream<Summary> summaryStream = simpleImmutableEntryStream.parallelStream().map(
inputPair -> {
return FeedbackDao.find(inputPair.getKey(), inputPair.getValue());
}).filter(Objects::nonNull);
return summaryStream.collect(Collectors.groupingBy(Summary::getPersonId));
});
Upvotes: 2
Views: 293
Reputation: 44985
This is related to how ForkJoinTask.fork is implemented, if the current thread comes from a ForkJoinPool
it will use the same pool to submit the new tasks but if not it will use the common pool with the total amount of processors in your local machine and here when you create your pool with Executors.newCachedThreadPool()
, the thread created by this pool is not recognized as coming from a ForkJoinPool
such that it uses the common pool.
Here is how it is implemented, it should help you to better understand:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
The thread created by the pool Executors.newCachedThreadPool()
will not be of type ForkJoinWorkerThread
such that it will use the common pool with an under optimized pool size to submit the new tasks.
Upvotes: 2