Reputation: 3311
While reading the documentation about streams, I came across the following sentences:
... attempting to access mutable state from behavioral parameters presents you with a bad choice ... if you do not synchronize access to that state, you have a data race and therefore your code is broken ... [1]
If the behavioral parameters do have side-effects ... [there are no] guarantees that different operations on the "same" element within the same stream pipeline are executed in the same thread. [2]
For any given element, the action may be performed at whatever time and in whatever thread the library chooses. [3]
These sentences don't make a distinction between sequential and parallel streams. So my questions are:
Upvotes: 12
Views: 2117
Reputation: 13923
Definition 1.1. Pipeline is a couple of chained methods.
Definition 1.2. Intermediate operations will be located everywhere in the stream except at the end. They return a stream object and does not execute any operation in the pipeline.
Definition 1.3. Terminal operations will be located only at the end of the stream. They execute the pipeline. They does not return stream object so no other Intermidiate operations or terminal operations can be added after them.
action
method inside the forEach
terminal operation on each element in the calling stream.Java 8 introduces us the Spliterator
interface. It has the capabilities of Iterator
but also a set of operations to help performing and spliting a task in parallel.
When calling forEach
from primitive streams in sequential execution, the calling thread will invoke the Spliterator.forEachRemaining
method:
@Override
public void forEach(IntConsumer action) {
if (!isParallel()) {
adapt(sourceStageSpliterator()).forEachRemaining(action);
}
else {
super.forEach(action);
}
}
You can read more on Spliterator
in my tutorial: Part 6 - Spliterator
Stream operations like reduce use accumulator
and combiner
functions for executing parallel streams. The streams library by definition forbids mutation. You should avoid it.
There are a lot of definitions in concurrent and parallel programming. I will introduce a set of definitions that will serve us best.
Definition 8.1. Concurrent programming is the ability to solve a task using additional synchronization algorithms.
Definition 8.2. Parallel programming is the ability to solve a task without using additional synchronization algorithms.
You can read more about it in my tutorial: Part 7 - Parallel Streams.
Upvotes: 1
Reputation: 21094
This all boils down to what is guaranteed based on the specification, and the fact that a current implementation may have additional behaviors beyond what is guaranteed.
Java Language Architect Brian Goetz made a relevant point regarding specifications in a related question:
Specifications exist to describe the minimal guarantees a caller can depend on, not to describe what the implementation does.
[...]
When a specification says "does not preserve property X", it does not mean that the property X may never be observed; it means the implementation is not obligated to preserve it. [...] (
HashSet
doesn't promise that iterating its elements preserves the order they were inserted, but that doesn't mean this can't accidentally happen -- you just can't count on it.)
This all means that even if the current implementation happens to have certain behavioral characteristics, they should not be relied upon nor assumed that they will not change in new versions of the library.
In which thread is the pipeline of a sequential stream executed? Is it always the calling thread or is an implementation free to choose any thread?
Current stream implementations may or may not use the calling thread, and may use one or multiple threads. As none of this is specified by the API, this behavior should not be relied on.
forEach
execution threadIn which thread is the action parameter of the forEach terminal operation executed if the stream is sequential?
While current implementations use the existing thread, this cannot be relied on, as the documentation states that the choice of thread is up to the implementation. In fact, there are no guarantees that the elements aren't processed by different threads for different elements, though that is not something the current stream implementation does either.
Per the API:
For any given element, the action may be performed at whatever time and in whatever thread the library chooses.
Note that while the API calls out parallel streams specifically when discussing encounter order, that was clarified by Brian Goetz to clarify the motivation of the behavior, and not that any of the behavior is specific to parallel streams:
The intent of calling out the parallel case explicitly here was pedagogical [...]. However, to a reader who is unaware of parallelism, it would be almost impossible to not assume that
forEach
would preserve encounter order, so this sentence was added to help clarify the motivation.
Do I have to use any synchronization when using sequential streams?
Current implementations will likely work since they use a single thread for the sequential stream's forEach
method. However, as it is not guaranteed by the stream specification, it should not be relied on. Therefore, synchronization should be used as though the methods could be called by multiple threads.
That said, the stream documentation specifically recommends against using side-effects that would require synchronization, and suggest using reduction operations instead of mutable accumulators:
Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators. [...] A small number of stream operations, such as forEach() and peek(), can operate only via side-effects; these should be used with care.
As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.
ArrayList<String> results = new ArrayList<>(); stream.filter(s -> pattern.matcher(s).matches()) .forEach(s -> results.add(s)); // Unnecessary use of side-effects!
This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:
List<String>results = stream.filter(s -> pattern.matcher(s).matches()) .collect(Collectors.toList()); // No side-effects!
Upvotes: 3