Alex R
Alex R

Reputation: 3311

Java, in which thread are sequential streams executed?

While reading the documentation about streams, I came across the following sentences:

These sentences don't make a distinction between sequential and parallel streams. So my questions are:

  1. In which thread is the pipeline of a sequential stream executed? Is it always the calling thread or is an implementation free to choose any thread?
  2. In which thread is the action parameter of the forEach terminal operation executed if the stream is sequential?
  3. Do I have to use any synchronization when using sequential streams?

Upvotes: 12

Views: 2117

Answers (2)

Stav Alfi
Stav Alfi

Reputation: 13923

  1. Stream's terminal operations are blocking operations. In case there is no parallel excution, the thread that executes the terminal operation runs all the operations in the pipeline.

Definition 1.1. Pipeline is a couple of chained methods.

Definition 1.2. Intermediate operations will be located everywhere in the stream except at the end. They return a stream object and does not execute any operation in the pipeline.

Definition 1.3. Terminal operations will be located only at the end of the stream. They execute the pipeline. They does not return stream object so no other Intermidiate operations or terminal operations can be added after them.

  1. From the first solution we can conclude that the calling thread will execute the action method inside the forEach terminal operation on each element in the calling stream.

Java 8 introduces us the Spliterator interface. It has the capabilities of Iterator but also a set of operations to help performing and spliting a task in parallel.

When calling forEach from primitive streams in sequential execution, the calling thread will invoke the Spliterator.forEachRemaining method:

@Override
public void forEach(IntConsumer action) {
   if (!isParallel()) {
        adapt(sourceStageSpliterator()).forEachRemaining(action);
    }
    else {
        super.forEach(action);
    }
}

You can read more on Spliterator in my tutorial: Part 6 - Spliterator

  1. As long as you don't mutate any shared state between multiple threads in one of the stream operations(and it is forbidden - explained soon), you do not need to use any additional synchronization tool or algorithm when you want to run parallel streams.

Stream operations like reduce use accumulator and combiner functions for executing parallel streams. The streams library by definition forbids mutation. You should avoid it.

There are a lot of definitions in concurrent and parallel programming. I will introduce a set of definitions that will serve us best.

Definition 8.1. Concurrent programming is the ability to solve a task using additional synchronization algorithms.

Definition 8.2. Parallel programming is the ability to solve a task without using additional synchronization algorithms.

You can read more about it in my tutorial: Part 7 - Parallel Streams.

Upvotes: 1

M. Justin
M. Justin

Reputation: 21094

This all boils down to what is guaranteed based on the specification, and the fact that a current implementation may have additional behaviors beyond what is guaranteed.

Java Language Architect Brian Goetz made a relevant point regarding specifications in a related question:

Specifications exist to describe the minimal guarantees a caller can depend on, not to describe what the implementation does.

[...]

When a specification says "does not preserve property X", it does not mean that the property X may never be observed; it means the implementation is not obligated to preserve it. [...] (HashSet doesn't promise that iterating its elements preserves the order they were inserted, but that doesn't mean this can't accidentally happen -- you just can't count on it.)

This all means that even if the current implementation happens to have certain behavioral characteristics, they should not be relied upon nor assumed that they will not change in new versions of the library.

Sequential stream pipeline thread

In which thread is the pipeline of a sequential stream executed? Is it always the calling thread or is an implementation free to choose any thread?

Current stream implementations may or may not use the calling thread, and may use one or multiple threads. As none of this is specified by the API, this behavior should not be relied on.

forEach execution thread

In which thread is the action parameter of the forEach terminal operation executed if the stream is sequential?

While current implementations use the existing thread, this cannot be relied on, as the documentation states that the choice of thread is up to the implementation. In fact, there are no guarantees that the elements aren't processed by different threads for different elements, though that is not something the current stream implementation does either.

Per the API:

For any given element, the action may be performed at whatever time and in whatever thread the library chooses.

Note that while the API calls out parallel streams specifically when discussing encounter order, that was clarified by Brian Goetz to clarify the motivation of the behavior, and not that any of the behavior is specific to parallel streams:

The intent of calling out the parallel case explicitly here was pedagogical [...]. However, to a reader who is unaware of parallelism, it would be almost impossible to not assume that forEach would preserve encounter order, so this sentence was added to help clarify the motivation.

Synchronization using sequential streams

Do I have to use any synchronization when using sequential streams?

Current implementations will likely work since they use a single thread for the sequential stream's forEach method. However, as it is not guaranteed by the stream specification, it should not be relied on. Therefore, synchronization should be used as though the methods could be called by multiple threads.

That said, the stream documentation specifically recommends against using side-effects that would require synchronization, and suggest using reduction operations instead of mutable accumulators:

Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators. [...] A small number of stream operations, such as forEach() and peek(), can operate only via side-effects; these should be used with care.

As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.

     ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!

This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:

     List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
               .collect(Collectors.toList());  // No side-effects!

Upvotes: 3

Related Questions