Giulio
Giulio

Reputation: 302

Performance implications of Java parallel streams

What are the best practices around using .stream().parallel()?

For example, if you have a bunch of blocking I/O calls and you want to check if .anyMatch(...), doing this in parallel seems a sensible thing to do.

Example code:

public boolean hasAnyRecentReference(JobId jobid) {
  <...>
  return pendingJobReferences.stream()
     .parallel()
     .anyMatch(pendingRef -> { 
       JobReference readReference = pendingRef.sync();
       Duration referenceAge = timeService.timeSince(readReference.creationTime());
       return referenceAge.lessThan(maxReferenceAge)
     });
}

At first glance this looks sensible because we can perform multiple blocking reads concurrently since we only care about any that match, instead of checking one after the other (so if every read takes 50ms we only have to wait for (50ms * expectedNumberOfNonRecentRefs) / numThreads ).

Could introducing this code in a production environment have any unforeseen performance implications on other parts of the codebase?

Upvotes: 5

Views: 2802

Answers (1)

Giulio
Giulio

Reputation: 302

EDIT: As @edharned points out .parallel() now uses CountedCompleter instead of calling .join(), which has problems of its own as well explained by Ed in http://coopsoft.com/ar/Calamity2Article.html under the What is currently being done? section.

I believe the info below is still useful to get an understanding of why the fork-join framework is tricky and the alternatives proposed to .parallel() among the conclusions are still relevant.


While the spirit of the code is right, the actual code can have system-wide impact on all code that uses .parallel() even though that's not obvious at all.

A while back I found an article that recommended against doing this: https://dzone.com/articles/think-twice-using-java-8, but I never dug deeper until recently.

These are my thoughts after doing a bunch of reading:

  1. .parallel() in Java uses ForkJoinPool.commonPool() which is a singleton ForkJoinPool shared by all streams (ForkJoinPool.commonPool() is a public static method, so in theory other libraries / parts of the code could be using it)
  2. ForkJoinPool implements work-stealing and has per-thread queues in addition to a shared queue

    1. Work stealing means that when a thread is idle, it will look for more work to do
    2. Initially I thought: by that definition, doesn’t a cached thread pool also do work stealing (even though some references call that work sharing for cached thread pools)?
    3. Turns out there seems to be some terminology vagueness when using the word idle:

      1. In a cached threadpool, a thread is idle only after it has completed its task. It does not become idle if it is blocked waiting on a blocking call
      2. In a forkjoin threadpool, a thread is idle either when it has completed its task or when it invokes the .join() method (which is a special blocking call) on a subtask .

        When .join() is called on a subtask, the thread becomes idle while waiting on that subtask to complete. While idle, it will try to execute any other available task, even if it is in another thread’s queue (it steals work).

        [This is the important bit] Once it has found another task to execute, it must complete it before resuming its original execution, even if the subtask it was waiting on completes while the thread is still executing the stolen task.

        [This is also important] This work-stealing behavior only applies to threads that call .join(). If a thread is blocked on something else, like I/O, it become idle (i.e. it won't steal work).

  3. Java streams don’t allow you to provide a custom ForkJoinPool but https://github.com/amaembo/streamex does

It took me a while to understand the implications of 2.3.2, so I’ll give a quick example to help illustrate the issue:

Note: these are dummy examples, but you can get into equivalent situations without realizing it by using streams, which internally do fork join stuff.

Also, I’ll use extremely simplified pseudo code that only serves to illustrate the .parallel() issue, but doesn't necessarily make sense otherwise.

Let’s say we’re implementing merge sort

merge_sort(list):
    left, right = split(list)

    leftTask = mergeSortTask(left).fork()
    rightTask = mergeSortTaks(right).fork()

    return merge(leftTask.join(), rightTask.join())

Let’s now say we have another piece of code that does the following:

dummy_collect_results(queriesIds):
   pending_results = []

   for id in queriesIds: 
     pending_results += longBlockingIOTask(id).fork()

  // do more stuff

What happens here?

When you write the merge sort code, you think sorting calls don’t do any I/O so their performance should be pretty deterministic, right?

Right. The thing you might not expect is that, since the dummy_collect_results method created a bunch of long running and blocking subtasks, when the threads carrying out mergesort tasks block on .join(), awaiting the completion of subtasks, they may start executing one of the long blocking subtasks.

This is bad because, as mentioned above, once the long blocking (on I/O, not a .join() call, so the thread won't become idle again) has been stolen, it must be completed, regardless of whether the sub-task a thread was waiting for via .join() completed while blocking for I/O.

This makes the execution of mergesort tasks no longer deterministic, because the threads carrying those out might end up stealing I/O intensive tasks generated in by code that lives somewhere else entirely.

This is also pretty scary and hard to catch because you could have been using .parallel() throughout your codebase without any issues, and all it takes is one class that introduces long-running tasks while using .parallel() and all of a sudden all other parts of your code base might get inconsistent performance.

So my conclusions are:

  1. Theoretically, .parallel() is fine if you can guarantee that all tasks that are ever going to be created anywhere in your code are short
  2. .parallel() can have system-wide performance implications that are non-obvious unless you know (e.g. if you later on add a single piece of code that uses .parallel() and has long tasks, you might influence the perf of all code that uses .parallel())
  3. Because of 2. you're better off avoiding .parallel() altogether and either use an ExecutorCompletionService or use https://github.com/amaembo/streamex which allows you to provide your own ForkJoinPool (which allows for a little more isolation). Even better, you can use https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53, which gives you even finer-grained control over your mechanism of concurrency.

Upvotes: 5

Related Questions