Abhishekkumar
Abhishekkumar

Reputation: 1102

Java8 Parallel Stream taking time to sum values

I am practising java8 parallel stream part and write a program that will sum the number passed as parameter from 0 to that number.

For example if i passed 10 it will sum numbers from 1 to 10 and return the output.

Below is the program

public class ParellelStreamExample {



    public static void main(String[] args) {
        System.out.println("Long Range value - "+ Long.MIN_VALUE + " to "+ Long.MAX_VALUE);
        long startTime = System.nanoTime();
        long sum = sequentailSum(100000000);
        System.out.println(
                "Time in sequential execution " + (System.nanoTime() - startTime) / 1000000 + " msec with sum = " + sum);
        long startTime1 = System.nanoTime();
        long sum1 = parellelSum(100000000);
        System.out.println("Time in parallel execution " + (System.nanoTime() - startTime1) / 1000000
                + " msec with sum = " + sum1);

    }

    private static Long parellelSum(long n) {
        return Stream.iterate(1l, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
    }

    private static Long sequentailSum(long n) {
        return Stream.iterate(1l, i -> i + 1).limit(n).reduce(0L, Long::sum);
    }
}

The output which i received is

Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 1741 msec with sum = 5000000050000000

Exception in thread "main" java.lang.OutOfMemoryError
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.SliceOps$1.opEvaluateParallelLazy(SliceOps.java:155)
    at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:474)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.parellelSum(ParellelStreamExample.java:21)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.main(ParellelStreamExample.java:14)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Long.valueOf(Long.java:840)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.lambda$0(ParellelStreamExample.java:21)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample$$Lambda$3/250421012.apply(Unknown Source)
    at java.util.stream.Stream$1.next(Stream.java:1033)
    at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
    at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Why this program does not run in parallel part and gc overhead occures, instead it should run faster in parallel portion as it is using fork/join framework and doing process via threads interally.

What went wrong in it?

Upvotes: 6

Views: 1443

Answers (2)

wumpz
wumpz

Reputation: 9131

I do explicitly not discuss benchmark flaws ( ;) ). The main problem here seems to be the understanding of using specific Stream functions and their behavior.

Try something like:

LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum)

but to be fair, the sequential one should be adapted as well:

LongStream.rangeClosed(1, n).reduce(0L, Long::sum)

Now I got this runtime behaviour:

Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 90 msec with sum = 5000000050000000
Time in parallel execution 25 msec with sum = 5000000050000000

I assume, that is what you expected.

Like with every other API, you have to understand, what the specific methods are doing, especially if you want to go parallel. But as you can see even the sequential processing takes massive advantage of using this different approach.

Look at https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps to get an idea of types of methods.

For instance the useage of limit:

Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism.

Upvotes: 1

Misha
Misha

Reputation: 28133

There are several things that went wrong here.

  1. You are trying to benchmark code with System.nanoTime() instead of something like JMH.
  2. You are trying to parellelize a trivial computation (sum) on Long instead of using a LongStream. If JVM is not able to get rid of the boxing, the overhead of pointer chasing can easily overwhelm the benefits of parallelism.
  3. You are trying to parellelize an inherrently sequential stream produced by iterate. Stream framework will try to do what you ask by buffering the stream and dispatching it to multiple threads, which adds a lot of overhead.
  4. You are using limit on an ordered parallel stream. This requires the stream framework to do a great deal of extra synchronization to ensure that exactly n first elements are used to produce the outcome. You will see that if you put .unordered() in the parallel stream the execution time will decrease dramatically but the result will be non-determiniestic as you will get the sum of some n elements rather than necessarily first n elements.

The right way to do this is to use JMH and replace iterate(...).limit(...) with LongStream.rangeClosed(1, n)

Upvotes: 6

Related Questions