Reputation: 2312
How do I effectively parallel my computation of pi (just as an example)?
This works (and takes about 15secs on my machine):
Stream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).mapToDouble(d->4.0d/d).sum()
But all of the following parallel variants run into an OutOfMemoryError
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).parallel().map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).map(d->4.0d/d).parallel().sum();
So, what do I need to do to get parallel processing of this (large) stream? I already checked if autoboxing is causing the memory consumption, but it is not. This works also:
DoubleStream.iterate(1, d->-(d+Math.abs(2*d)/d)).boxed().limit(999999999L).mapToDouble(d->4/d).sum()
Upvotes: 0
Views: 2746
Reputation: 298143
The problem is that you are using constructs which are hard to parallelize.
First, Stream.iterate(…)
creates a sequence of numbers where each calculation depends on the previous value, hence, it offers no room for parallel computation. Even worse, it creates an infinite stream which will be handled by the implementation like a stream with unknown size. For splitting the stream, the values have to be collected into arrays before they can be handed over to other computation threads.
Second, providing a limit(…)
doesn’t improve the situation, it makes the situation even worse. Applying a limit removes the size information which the implementation just had gathered for the array fragments. The reason is that the stream is ordered, thus a thread processing an array fragment doesn’t know whether it can process all elements as that depends on the information how many previous elements other threads are processing. This is documented:
“… it can be quite expensive on ordered parallel pipelines, especially for large values of
maxSize
, sincelimit(n)
is constrained to return not just any n elements, but the first n elements in the encounter order.”
That’s a pity as we perfectly know that the combination of an infinite sequence returned by iterate
with a limit(…)
actually has an exactly known size. But the implementation doesn’t know. And the API doesn’t provide a way to create an efficient combination of the two. But we can do it ourselves:
static DoubleStream iterate(double seed, DoubleUnaryOperator f, long limit) {
return StreamSupport.doubleStream(new Spliterators.AbstractDoubleSpliterator(limit,
Spliterator.ORDERED|Spliterator.SIZED|Spliterator.IMMUTABLE|Spliterator.NONNULL) {
long remaining=limit;
double value=seed;
public boolean tryAdvance(DoubleConsumer action) {
if(remaining==0) return false;
double d=value;
if(--remaining>0) value=f.applyAsDouble(d);
action.accept(d);
return true;
}
}, false);
}
Once we have such an iterate-with-limit method we can use it like
iterate(1d, d -> -(d+2*(Math.abs(d)/d)), 999999999L).parallel().map(d->4.0d/d).sum()
this still doesn’t benefit much from parallel execution due to the sequential nature of the source, but it works. On my four core machine it managed to get roughly 20% gain.
Upvotes: 3
Reputation: 127
This is because the default ForkJoinPool
implementation used by the parallel()
method does not limit the number of threads that get created. The solution is to provide a custom implementation of a ForkJoinPool
that is limited to the number of threads that it executes in parallel. This can be achieved as mentioned below:
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.submit(() -> DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum());
Upvotes: -1