Reputation: 1102
I am practising java8 parallel stream part and write a program that will sum the number passed as parameter from 0 to that number.
For example if i passed 10 it will sum numbers from 1 to 10 and return the output.
Below is the program
public class ParellelStreamExample {
public static void main(String[] args) {
System.out.println("Long Range value - "+ Long.MIN_VALUE + " to "+ Long.MAX_VALUE);
long startTime = System.nanoTime();
long sum = sequentailSum(100000000);
System.out.println(
"Time in sequential execution " + (System.nanoTime() - startTime) / 1000000 + " msec with sum = " + sum);
long startTime1 = System.nanoTime();
long sum1 = parellelSum(100000000);
System.out.println("Time in parallel execution " + (System.nanoTime() - startTime1) / 1000000
+ " msec with sum = " + sum1);
}
private static Long parellelSum(long n) {
return Stream.iterate(1l, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
}
private static Long sequentailSum(long n) {
return Stream.iterate(1l, i -> i + 1).limit(n).reduce(0L, Long::sum);
}
}
The output which i received is
Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 1741 msec with sum = 5000000050000000
Exception in thread "main" java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.SliceOps$1.opEvaluateParallelLazy(SliceOps.java:155)
at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:474)
at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.parellelSum(ParellelStreamExample.java:21)
at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.main(ParellelStreamExample.java:14)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Long.valueOf(Long.java:840)
at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.lambda$0(ParellelStreamExample.java:21)
at com.abhishek.javainaction.stream.parellel.ParellelStreamExample$$Lambda$3/250421012.apply(Unknown Source)
at java.util.stream.Stream$1.next(Stream.java:1033)
at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Why this program does not run in parallel part and gc overhead occures, instead it should run faster in parallel portion as it is using fork/join framework and doing process via threads interally.
What went wrong in it?
Upvotes: 6
Views: 1443
Reputation: 9131
I do explicitly not discuss benchmark flaws ( ;) ). The main problem here seems to be the understanding of using specific Stream functions and their behavior.
Try something like:
LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum)
but to be fair, the sequential one should be adapted as well:
LongStream.rangeClosed(1, n).reduce(0L, Long::sum)
Now I got this runtime behaviour:
Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 90 msec with sum = 5000000050000000
Time in parallel execution 25 msec with sum = 5000000050000000
I assume, that is what you expected.
Like with every other API, you have to understand, what the specific methods are doing, especially if you want to go parallel. But as you can see even the sequential processing takes massive advantage of using this different approach.
Look at https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps to get an idea of types of methods.
For instance the useage of limit:
Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism.
Upvotes: 1
Reputation: 28133
There are several things that went wrong here.
System.nanoTime()
instead of something like JMH.sum
) on Long
instead of using a LongStream
. If JVM is not able to get rid of the boxing, the overhead of pointer chasing can easily overwhelm the benefits of parallelism.iterate
. Stream framework will try to do what you ask by buffering the stream and dispatching it to multiple threads, which adds a lot of overhead.limit
on an ordered parallel stream. This requires the stream framework to do a great deal of extra synchronization to ensure that exactly n
first elements are used to produce the outcome. You will see that if you put .unordered()
in the parallel stream the execution time will decrease dramatically but the result will be non-determiniestic as you will get the sum of some n
elements rather than necessarily first n
elements.The right way to do this is to use JMH and replace iterate(...).limit(...)
with LongStream.rangeClosed(1, n)
Upvotes: 6