Reputation: 5275
I'm working with Java 8 Streams. I have a custom function, foo()
, that yields an object, and I would like to parallel stream the objects it creates. I know foo()
is not thread safe.
If I write Stream.generate(foo).parallel()
, will foo()
be called in an asynchronous manner? I.e. will objects be generated serially, and passed to parallel threads, or will multiple threads each generate objects as needed by calling foo()
?
Upvotes: 3
Views: 164
Reputation: 298123
While data races are not a guaranteed behavior, the following code
System.out.println(
Stream.generate(new Supplier<Integer>() {
int i; @Override public Integer get() { return i++; }
}).parallel()
.limit(10_000)
.collect(BitSet::new, BitSet::set, BitSet::or)
.cardinality()
);
reproducibly prints numbers less than 10,000 in my environment, demonstrating that missing updates may indeed happen when the supplier is not thread safe.
Note that it is also possible that the supplier gets queried for more elements than needed for the result evaluation. E.g.
LongAdder adder = new LongAdder();
System.out.println(
Stream.generate(new Supplier<Integer>() {
int i; @Override public Integer get() { adder.increment(); return i++; }
}).parallel()
.limit(10_000)
.collect(BitSet::new, BitSet::set, BitSet::or)
.cardinality()
);
System.out.println("queried "+adder+" times");
usually reports a number of queries greater than 10,000, while at the same time, the result reports less than 10,000 distinct elements due to the data race.
Making the supplier thread safe, changes the result to the correct number of 10,000 distinct elements, but the supplier may still got queried more than 10,000 times, hence, the result is not guaranteed to contain exactly the numbers from 0 to 9,999, as the stream created via generate
is unordered, so any 10,000 distinct numbers from the supplier may be used.
Upvotes: 3
Reputation: 28133
The supplier will be called from multiple threads as you can observe with a quick experiment:
Stream.generate(() -> Thread.currentThread().getId())
.parallel()
.limit(100000)
.distinct()
.forEach(System.out::println);
Upvotes: 5