Reputation: 67
I am trying to demonstrate the performance of RxJava compared to sequential (what I assumed to be) blocking calculations.
I was looking at this post and this SO question. From experience, benchmarking using System.currentTimeMillis() and Thread.sleep() does not yield consistent results when dealing with calculations instead of I/O so I tried setting up a simple JMH benchmark instead.
My benchmark calculates two ints and adds them up:
public class MyBenchmark {
private Worker workerSequential;
private Worker workerParallel;
private int semiIntenseCalculation(int i) {
Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(i)))))))))))))))));
return d.intValue() + i;
}
private int nonIntenseCalculation(int i) {
Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(i)))))));
return d.intValue() + i;
}
private Observable<Object> intensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101);
Integer i = semiIntenseCalculation(randomNumforSemi);
int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101);
Integer j = nonIntenseCalculation(randomNumforNon);
return i+j;
}
});
};
private Observable<Object> semiIntensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101);
return semiIntenseCalculation(randomNumforSemi);
}
});
};
private Observable<Object> nonIntensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101);
return nonIntenseCalculation(randomNumforNon);
}
});
};
public interface Worker {
void work();
}
@Setup
public void setup(final Blackhole bh) {
workerSequential = new Worker() {
@Override
public void work() {
Observable.just(intensiveObservable())
.subscribe(new Subscriber<Object>() {
@Override
public void onError(Throwable error) {
}
@Override
public void onCompleted() {
}
@Override
public void onNext(Object arg) {
bh.consume(arg);
}
});
}
};
workerParallel = new Worker() {
@Override
public void work() {
Observable.zip(semiIntensiveObservable().subscribeOn(Schedulers.computation()),
nonIntensiveObservable().subscribeOn(Schedulers.computation()),
new Func2<Object, Object, Object>() {
@Override
public Object call(Object semiIntensive, Object nonIntensive) {
return (Integer)semiIntensive + (Integer)nonIntensive;
}
}).subscribe(bh::consume);
}
};
}
@Benchmark
public void calculateSequential() {
workerSequential.work();
}
@Benchmark
public void calculateParallel() {
workerParallel.work();
}
}
I am puzzled by the result:
# Run complete. Total time: 00:00:21
Benchmark Mode Cnt Score Error Units
MyBenchmark.calculateParallel avgt 5 15602,176 ± 1663,650 ns/op
MyBenchmark.calculateSequential avgt 5 288,128 ± 6,982 ns/op
Obviously I was expecting the parallel calculation to be quicker. Is RxJava only good for parallel I/O or why am I getting these results?
Upvotes: 0
Views: 196
Reputation: 69997
You are doing the benchmark wrong. You should be waiting for the parallel work to finish otherwise (via blockingSubscribe
) you'll start quite a lot of them which adds significant GC overhead and inflates the internal queues of the executors as well.
Here is the reference benchmark for measuring various parallel work. Note that dispatching work has overhead on its own and unless you have 500+ cycles per work item in a parallel setting, you may not see any improvements in such fork-join type parallel workload.
Upvotes: 1