Reputation: 777
I recently implemented a certain algorithm, single consumer and multiple producers, using a blocking queue for sharing state. While implementing a simplified variant, I thought I could perhaps get it to work using a basic Java stream implementation.
But it did not work as expected. An example reproducing the unexpected behaviour:
LongStream.range(0, 32)
.unordered()
.parallel()
.map(value -> {
// 'do some work', which can vary in duration
try {
Thread.sleep(new Random(value).nextLong(1000));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("map " + value);
return value;
}).forEachOrdered(value -> {
// actual implementation writes to output stream
System.out.println(">>> " + value);
System.out.flush();
});
My initial idea was that as soon as each map finishes, it could get consumed by the terminal operation (roughly). And forEachOrdered
would process one item before another (so no manual synchronization necessary), compared to using forEach
. But when I run the example code, I get the following output:
map 7
map 29
map 8
map 11
map 28
map 21
map 20
map 13
map 6
map 10
map 1
map 15
map 5
map 24
map 26
map 14
map 9
map 2
map 31
map 17
map 22
map 16
map 3
map 23
map 12
map 27
map 0
>>> 0
>>> 1
>>> 2
>>> 3
map 4
>>> 4
>>> 5
>>> 6
>>> 7
>>> 8
>>> 9
>>> 10
>>> 11
>>> 12
>>> 13
>>> 14
>>> 15
>>> 16
>>> 17
map 25
map 30
map 18
>>> 18
map 19
>>> 19
>>> 20
>>> 21
>>> 22
>>> 23
>>> 24
>>> 25
>>> 26
>>> 27
>>> 28
>>> 29
>>> 30
>>> 31
As you can see, the elements are mapped out of order, but the terminal operation is called in order of the stream source. You could exacerbate the problem by just sleeping for a very long time when encountering value 0, and not sleeping for all other values. The order however, does not matter.
This confuses me, because I declare the stream to be unordered. And as I understand it, forEachOrdered
doesn't necessarily process in order when used with an unordered stream.
Of course, this does not mean it can't, so the current behaviour as far as I can tell is perfectly fine. Just not usable in my case.
Am I misunderstanding something? Is it to be expected, and if so, what causes it and why? Is it something where the Streams implementation could be improved?
Update: In response to the questions, a little bit more detail:
In my original implementation I have a task queue containing tasks, producer threads taking tasks from this queue and pushing the results to a single consumer queue. Each producer thread processes a lot of tasks, so will also produce multiple values. The consumer thread processes 1 result at a time which is important, because it writes to an output stream, so the calls should be serialized/synchronized.
For the stream implementation, my idea was to stream the tasks, let the producers be the map tasks, and writing to output in a terminal call. The reason I used forEachOrdered
is that it promises
to process 1 element at a time. But it does not promise order, when the stream has no defined encounter order, which is fine. This last bit, together with declaring the stream unordered
and parallel
, made me think it should allow for unordered parallel processing. But it doesn't.
Maybe it just happens to be an implementation detail why it currently doesn't. Or maybe I'm misunderstanding. Note that if I change forEachOrdered
to forEach
and synchronize inside the forEach
, it works as expected.
Upvotes: 8
Views: 382
Reputation: 7130
As you can see, the elements are mapped out of order, but the terminal operation is called in order of the stream source. [...] This confuses me, because I declare the stream to be unordered. And as I understand it, forEachOrdered doesn't necessarily process in order when used with an unordered stream. [...] Am I misunderstanding something? Is it to be expected, and if so, what causes it and why? Is it something where the Streams implementation could be improved?
When dealing with streams, debugging helps to better understand what happens under the hood.
The stream is created via the static method LongStream.range()
, which returns a long
stream from Streams.RangeLongSpliterator
. Here, we're not creating a stream with the actual long values: 0
, 1
, ..., 31
, but rather with a range that is iterated via a Spliterator.OfLong
. Once the stream is created, the actual implementation returned by LongStream.ranage()
is an instance of the static nested class LongPipeline.Head
.
The further calls to unordered()
and parallel()
only set the corresponding flags and wrap the stream into a StatelessOp
. The interesting part actually happens in the terminal operation forEachOrdered()
and that's where things get a little convoluted. I'll try to break it down as much as possible to follow each step, but it's probably best to read the following part while debugging your code.
forEachOrdered()
first creates a terminal operation from the action to perform with ForEachOps.makeLong(action, true)
, and then passes it to the method evaluate()
inherited from AbstractPipeline
. At this point, the method evaluateParallel()
of the given TerminalOp
is invoked, creating an instance of ForEachOrderedTask
with the Spliterator
created at the beginning. The class ForEachOrderedTask
indirectly inherits from ForkJoinPool
, and its documentation states:
Our goal is to ensure that the elements associated with a task are processed according to an in-order traversal of the computation tree. We use completion counts for representing these dependencies, so that a task does not complete until all the tasks preceding it in this order complete.
This means that, each element of the range is consumed only when the previous one is consumed first. Regardless how long each thread takes to complete its task, the terminal operation still relies on the initial Spliterator
to go through the elements of the range, processing them in the given order. Note that, this doesn't happen because the Spliterator
is defined on a range, this would be true even if the stream was defined with a list of values, like LongStream.of(5, 25, 13, 31)
. The terminal operation would still be created with the initial Spliterator
, consuming the elements in the given order.
Upvotes: 0
Reputation: 5135
Streams are not intended for parallel concurrent processing, but to process functional style operations on a stream of items/values/objects (name it how you like).
The package documentation of java.util.stream lists some differences to collections where one makes streams very unsuitable for parallel processing: "Laziness-seeking". This means that the final operation of a chain of stream operations can optimize the operations to skip unnecessary intermediate operations. For the scenario you described, I could not read that an optimization would be useful.
In short I think that concurrent operation of a chain of stream operations might be possible but is totally dependent on the implementation, when you specify parallel operations that just means that Java is allowed to do it in parallel if that optimizes the complete operation, but will not enforce that.
The above linked description also mentions
Streams facilitate parallel execution by reframing the computation as a pipeline of aggregate operations, rather than as imperative operations on each individual element.
This is not the same as your original implementation does.
If you want to optimize your code, instead of collecting the results in a single queue, you could also use the OutputStream
directly but steer the cuncurrency with synchronized
access or a ReentrantLock
.
You also could use a ThreadPoolExecutor
to which you submit tasks as Runnable
instances (this is for the parallel part, not for the serialized writing to the output and might be more or less suited for your problem).
I will not go into details of the alternatives since that wasn't the core part of your question.
Upvotes: 0