Reputation: 752
I wonder if the following works:
class SomeCalc {
AtomicLong someResultA;
AtomicDouble someResultB;
public SomeCalc(List<Something> someList) {
final Stream<Something> someStream = someList.stream();
new Thread(() -> {
long result = someStream.mapToLong(something -> something.getSomeLong()).sum();
someResultA.set(result);
}).start();
new Thread(() -> {
double result = someStream.mapToDouble(something -> (double) something.getSomeLong() / something.getAnotherLong()).sum();
someResultB.set(result);
}).start();
}
}
As long as the results are, as in this example, atomic, would this work out well? Or will there be any ConcurrentAccessModification
exceptions along the way or something else will go wrong?
Upvotes: 3
Views: 1245
Reputation: 19926
It wont work. You'll get an:
IllegalStateException: Stream has already been operated upon or closed
Because every stream can only be traversed once. This can also be read in the javadoc of java.util.stream.Stream<T>
:
A stream should be operated on [...] only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. However, since some stream operations may return their receiver rather than a new stream object, it may not be possible to detect reuse in all cases.
I guess your example is simplified. So you're preparing the stream and then let the 2 Threads do additional Stuff.
If you still want to keep that. You could create a supplier of that stream and then let the threads get 2 different instances:
final Supplier<Stream<Something>> provider = () -> someList.stream(); // and maybe more operations
new Thread(() -> {
long result = provider.get() // get an instance
.mapToLong(something -> something.getSomeLong())
.sum();
someResultA.set(result);
}).start();
new Thread(() -> {
double result = provider.get() // get another instance
.mapToDouble(something -> (double) something.getSomeLong() / something.getAnotherLong())
.sum();
someResultB.set(result);
}).start();
Upvotes: 7
Reputation: 45309
Possibilities of concurrent modifications let alone, this code won't work because it's calling multiple terminal operations on the same stream.
The two threads will end up calling sum
twice on the same stream, and that is not allowed. But there's no worry of concurrently modifying an atomic long objects, as it's designed to handle modification calls from multiple threads.
As you're already using atomic data structures, you can change it to something like this to make it work in one traversal:
someList.stream()
.mapToLong(something -> something.getSomeLong())
.forEach(entry -> {
someResultA.incrementAndGet(something.getSomeLong(entry));
//I assume the below call exists...
someResultB.incrementAndGet((double) something.getSomeLong() / something.getAnotherLong());
});
If the reason for using different threads is parallelism, then you can make this a parallel stream:
someList.stream().parallel()
.map...
Because of synchronization, maintaining the atomic values may end up being slower (in comparison to summing and setting), but it affords you a single traversal of a parallel stream.
Upvotes: 2