Cfx
Cfx

Reputation: 2312

How to prevent heap space error when using large parallel Java 8 stream

How do I effectively parallel my computation of pi (just as an example)?

This works (and takes about 15secs on my machine):

Stream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).mapToDouble(d->4.0d/d).sum()

But all of the following parallel variants run into an OutOfMemoryError

DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).parallel().map(d->4.0d/d).sum();
DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).limit(999999999L).map(d->4.0d/d).parallel().sum();

So, what do I need to do to get parallel processing of this (large) stream? I already checked if autoboxing is causing the memory consumption, but it is not. This works also:

DoubleStream.iterate(1, d->-(d+Math.abs(2*d)/d)).boxed().limit(999999999L).mapToDouble(d->4/d).sum()

Upvotes: 0

Views: 2746

Answers (2)

Holger
Holger

Reputation: 298143

The problem is that you are using constructs which are hard to parallelize.

First, Stream.iterate(…) creates a sequence of numbers where each calculation depends on the previous value, hence, it offers no room for parallel computation. Even worse, it creates an infinite stream which will be handled by the implementation like a stream with unknown size. For splitting the stream, the values have to be collected into arrays before they can be handed over to other computation threads.

Second, providing a limit(…) doesn’t improve the situation, it makes the situation even worse. Applying a limit removes the size information which the implementation just had gathered for the array fragments. The reason is that the stream is ordered, thus a thread processing an array fragment doesn’t know whether it can process all elements as that depends on the information how many previous elements other threads are processing. This is documented:

“… it can be quite expensive on ordered parallel pipelines, especially for large values of maxSize, since limit(n) is constrained to return not just any n elements, but the first n elements in the encounter order.”

That’s a pity as we perfectly know that the combination of an infinite sequence returned by iterate with a limit(…) actually has an exactly known size. But the implementation doesn’t know. And the API doesn’t provide a way to create an efficient combination of the two. But we can do it ourselves:

static DoubleStream iterate(double seed, DoubleUnaryOperator f, long limit) {
  return StreamSupport.doubleStream(new Spliterators.AbstractDoubleSpliterator(limit,
     Spliterator.ORDERED|Spliterator.SIZED|Spliterator.IMMUTABLE|Spliterator.NONNULL) {
       long remaining=limit;
       double value=seed;
       public boolean tryAdvance(DoubleConsumer action) {
           if(remaining==0) return false;
           double d=value;
           if(--remaining>0) value=f.applyAsDouble(d);
           action.accept(d);
           return true;
       }
   }, false);
}

Once we have such an iterate-with-limit method we can use it like

iterate(1d, d -> -(d+2*(Math.abs(d)/d)), 999999999L).parallel().map(d->4.0d/d).sum()

this still doesn’t benefit much from parallel execution due to the sequential nature of the source, but it works. On my four core machine it managed to get roughly 20% gain.

Upvotes: 3

Sanket Sarang
Sanket Sarang

Reputation: 127

This is because the default ForkJoinPool implementation used by the parallel() method does not limit the number of threads that get created. The solution is to provide a custom implementation of a ForkJoinPool that is limited to the number of threads that it executes in parallel. This can be achieved as mentioned below:

ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.submit(() -> DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum());

Upvotes: -1

Related Questions