Reputation: 80176
For the following flow, I am wondering how I can calculate the time it takes to process all the data in forEach(...).
Observable
.from(1,2,3)
.flatMap(it - {})
.toBlocking()
.forEarch(it -> {//some paring logic here})
EDIT
After reading this tutorial: Leaving the Monad, I feel the simple solution would be to do the following. Let me know if I missed something
List items = Observable
.from(1,2,3)
.flatMap(it - {})
.toList();
long startTime = System.currentTimeMillis();
for(Object it : items)
{
//some parsing here
}
long processingTime = System.currentTimeMillis() - startTime
Upvotes: 2
Views: 3400
Reputation: 42223
I think this what you want, from your code I split the production of values Observable.range
(that should match the Observable.just
in your sample) and the pipeline to measure, in this case I added some fake computation.
The idea is to wrap the pipeline you want to measure in a flatmap and add a stopwatch in a single flatmap.
Observable.range(1, 10_000)
.nest()
.flatMap(
o -> {
Observable<Integer> pipelineToMeasure = o.flatMap(i -> {
Random random = new Random(73);
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(5));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Observable.just(i);
});
Stopwatch measure = Stopwatch.createUnstarted();
return pipelineToMeasure
.doOnSubscribe(measure::start)
.doOnTerminate(() -> {
measure.stop();
System.out.println(measure);
});
}
)
.toBlocking()
.forEach(System.out::println);
Just to avoid confusion I used nest
to avoid recreating myself the Observable in the outer flatmap.
Also I'm using the Stopwatch
of the Guava library.
To give more information, here's a possible code to measure in the forEach
statement when blocking.
MeasurableAction1<Integer> measuring = MeasurableAction1.measure(System.out::println);
Observable
.just(1, 2, 3)
.flatMap(Observable::just)
.toBlocking()
.forEach(measuring.start());
measuring.stop().elapsed(TimeUnit.SECONDS);
And the measuring class :
private static class MeasurableAction1<T> implements Action1<T> {
private Stopwatch measure = Stopwatch.createUnstarted();
private Action1<? super T> action;
public MeasurableAction1(Action1<? super T> action) {
this.action = action;
}
@Override
public void call(T t) {
action.call(t);
}
public MeasurableAction1<T> start() {
measure.start();
return this;
}
public MeasurableAction1<T> stop() {
measure.stop();
return this;
}
public long elapsed(TimeUnit desiredUnit) {
return measure.elapsed(desiredUnit);
}
public static <T> MeasurableAction1<T> measure(Action1<? super T> action) {
return new MeasurableAction1<>(action);
}
}
And better without blocking with a subscriber, note that .subscribe
offer more options that the .forEach
alias (either when blocking or not):
Observable
.just(1, 2, 3)
.flatMap(Observable::just)
.subscribe(MeasuringSubscriber.measuringSubscriber(
System.out::println,
System.out::println,
System.out::println
));
And subscriber :
private static class MeasuringSubscriber<T> extends Subscriber<T> {
private Stopwatch measure = Stopwatch.createUnstarted();
private Action1<? super T> onNext;
private final Action1<Throwable> onError;
private final Action0 onComplete;
public MeasuringSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onComplete) {
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
}
@Override
public void onCompleted() {
try {
onComplete.call();
} finally {
stopAndPrintMeasure();
}
}
@Override
public void onError(Throwable e) {
try {
onError.call(e);
} finally {
stopAndPrintMeasure();
}
}
@Override
public void onNext(T item) {
onNext.call(item);
}
@Override
public void onStart() {
measure.start();
super.onStart();
}
private void stopAndPrintMeasure() {
measure.stop();
System.out.println("took " + measure);
}
private static <T> MeasuringSubscriber<T> measuringSubscriber(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
return new MeasuringSubscriber<>(onNext, onError, onComplete);
}
}
Upvotes: 1
Reputation: 1128
One option is to create an Observable which will output the timings. You can do this by wrapping your computation with Observable#using
:
public class TimerExample {
public static void main(String[] args) {
final PublishSubject<Long> timings = PublishSubject.create();
final Observable<List<Integer>> list = Observable
.just(1, 2, 3)
.flatMap(TimerExample::longRunningComputation)
.toList();
final Observable<List<Integer>> timed
= Observable.using(() -> new Timer(timings), (t) -> list, Timer::time);
timings.subscribe(time -> System.out.println("Time: " + time + "ms"));
List<Integer> ints = timed.toBlocking().last();
System.out.println("ints: " + Joiner.on(", ").join(ints));
ints = timed.toBlocking().last();
System.out.println("ints: " + Joiner.on(", ").join(ints));
}
private static Observable<Integer> longRunningComputation(Integer i) {
return Observable.timer(1, TimeUnit.SECONDS).map(ignored -> i);
}
public static class Timer {
private final long startTime;
private final Observer<Long> timings;
public Timer(Observer<Long> timings) {
this.startTime = System.currentTimeMillis();
this.timings = timings;
}
public void time() {
timings.onNext(System.currentTimeMillis() - startTime);
}
}
}
The timings are in this case print to the console, but you can do with them as you please:
Time: 1089ms
ints: 2, 1, 3
Time: 1003ms
ints: 1, 3, 2
Upvotes: 5