Aravind Yarram
Aravind Yarram

Reputation: 80176

How to calculate the processing time in rx

For the following flow, I am wondering how I can calculate the time it takes to process all the data in forEach(...).

Observable
  .from(1,2,3)
  .flatMap(it - {})
  .toBlocking()
  .forEarch(it -> {//some paring logic here})

EDIT

After reading this tutorial: Leaving the Monad, I feel the simple solution would be to do the following. Let me know if I missed something

List items = Observable
      .from(1,2,3)
      .flatMap(it - {})
      .toList();

long startTime = System.currentTimeMillis();

for(Object it : items)
{
  //some parsing here
}

long processingTime = System.currentTimeMillis() - startTime 

Upvotes: 2

Views: 3400

Answers (2)

bric3
bric3

Reputation: 42223

I think this what you want, from your code I split the production of values Observable.range (that should match the Observable.just in your sample) and the pipeline to measure, in this case I added some fake computation.

The idea is to wrap the pipeline you want to measure in a flatmap and add a stopwatch in a single flatmap.

Observable.range(1, 10_000)
        .nest()
        .flatMap(
                o -> {
                    Observable<Integer> pipelineToMeasure = o.flatMap(i -> {
                        Random random = new Random(73);
                        try {
                            TimeUnit.MILLISECONDS.sleep(random.nextInt(5));
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        return Observable.just(i);
                    });

                    Stopwatch measure = Stopwatch.createUnstarted();
                    return pipelineToMeasure
                            .doOnSubscribe(measure::start)
                            .doOnTerminate(() -> {
                                measure.stop();
                                System.out.println(measure);
                            });
                }
        )
        .toBlocking()
        .forEach(System.out::println);

Just to avoid confusion I used nest to avoid recreating myself the Observable in the outer flatmap. Also I'm using the Stopwatch of the Guava library.


To give more information, here's a possible code to measure in the forEach statement when blocking.

MeasurableAction1<Integer> measuring = MeasurableAction1.measure(System.out::println);
Observable
        .just(1, 2, 3)
        .flatMap(Observable::just)
        .toBlocking()
        .forEach(measuring.start());

measuring.stop().elapsed(TimeUnit.SECONDS);

And the measuring class :

private static class MeasurableAction1<T> implements Action1<T> {
    private Stopwatch measure = Stopwatch.createUnstarted();
    private Action1<? super T> action;

    public MeasurableAction1(Action1<? super T> action) {
        this.action = action;
    }

    @Override
    public void call(T t) {
        action.call(t);
    }

    public MeasurableAction1<T> start() {
        measure.start();
        return this;
    }

    public MeasurableAction1<T> stop() {
        measure.stop();
        return this;
    }

    public long elapsed(TimeUnit desiredUnit) {
        return measure.elapsed(desiredUnit);
    }

    public static <T> MeasurableAction1<T> measure(Action1<? super T> action) {
        return new MeasurableAction1<>(action);
    }
}

And better without blocking with a subscriber, note that .subscribe offer more options that the .forEach alias (either when blocking or not):

    Observable
            .just(1, 2, 3)
            .flatMap(Observable::just)
            .subscribe(MeasuringSubscriber.measuringSubscriber(
                    System.out::println,
                    System.out::println,
                    System.out::println
            ));

And subscriber :

private static class MeasuringSubscriber<T> extends Subscriber<T> {
    private Stopwatch measure = Stopwatch.createUnstarted();
    private Action1<? super T> onNext;
    private final Action1<Throwable> onError;
    private final Action0 onComplete;

    public MeasuringSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onComplete) {
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
    }

    @Override
    public void onCompleted() {
        try {
            onComplete.call();
        } finally {
            stopAndPrintMeasure();
        }
    }

    @Override
    public void onError(Throwable e) {
        try {
            onError.call(e);
        } finally {
            stopAndPrintMeasure();
        }
    }

    @Override
    public void onNext(T item) {
        onNext.call(item);
    }

    @Override
    public void onStart() {
        measure.start();
        super.onStart();
    }

    private void stopAndPrintMeasure() {
        measure.stop();
        System.out.println("took " + measure);
    }

    private static <T> MeasuringSubscriber<T> measuringSubscriber(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
        return new MeasuringSubscriber<>(onNext, onError, onComplete);
    }
}

Upvotes: 1

alexwen
alexwen

Reputation: 1128

One option is to create an Observable which will output the timings. You can do this by wrapping your computation with Observable#using:

public class TimerExample {
    public static void main(String[] args) {
        final PublishSubject<Long> timings = PublishSubject.create();

        final Observable<List<Integer>> list = Observable
                .just(1, 2, 3)
                .flatMap(TimerExample::longRunningComputation)
                .toList();

        final Observable<List<Integer>> timed
                = Observable.using(() -> new Timer(timings), (t) -> list, Timer::time);

        timings.subscribe(time -> System.out.println("Time: " + time + "ms"));

        List<Integer> ints = timed.toBlocking().last();
        System.out.println("ints: " + Joiner.on(", ").join(ints));

        ints = timed.toBlocking().last();
        System.out.println("ints: " + Joiner.on(", ").join(ints));
    }

    private static Observable<Integer> longRunningComputation(Integer i) {
        return Observable.timer(1, TimeUnit.SECONDS).map(ignored -> i);
    }

    public static class Timer {
        private final long startTime;
        private final Observer<Long> timings;

        public Timer(Observer<Long> timings) {
            this.startTime = System.currentTimeMillis();
            this.timings = timings;
        }

        public void time() {
            timings.onNext(System.currentTimeMillis() - startTime);
        }
    }
}

The timings are in this case print to the console, but you can do with them as you please:

Time: 1089ms
ints: 2, 1, 3
Time: 1003ms
ints: 1, 3, 2

Upvotes: 5

Related Questions