Oliver Hausler
Oliver Hausler

Reputation: 4977

How to prevent an Observable from queueing Timer iterations

I am using the following Observable to perform regular tasks. The Observable is launched when the class is first loaded into memory, and then executes my code in regular intervals. As it is a single Observable, it is guaranteed [an assumption, as it looks from my tests] that the code is never launched a second time and processed in parallel in case it runs longer than the interval.

private static Subscription subscription = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override public Observable<String> call(Long aLong) {

                // some code

                return Observable.just(null);
            }
        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
        .subscribe();

But this also has the disadvantage that rxjava accumulates delayed emissions and launches them in fast sequence once the delayed iteration is finished. Example: If the timer is programmed to iterate every 1000 msec and iteration n takes 5000 msec, iterations n+1, n+2, n+3, etc. are launched sequentially, but one after the other, and without obeying the Timer interval.

Not that bad, but what really is an issue is what happens when Android sleeps for a few hours. Because then rxjava launches all missed iterations in fast sequence once the device wakes up, which gives a quite heavy performance hit.

How can I tell rxjava to forget about missed iterations? In case an iteration takes longer, I'd either like the timer to start when that iteration is finished, or I want to drop missed iterations and start the next iteration when due. I tried to use sample() and other filters, but it somehow doesn't give me the desired effect or I don't know how to correctly apply them.

Note that I don't want to create a new Observable for each iteration (I could use zip for that) because I want to make sure the code is not executed from several threads.

Upvotes: 0

Views: 2790

Answers (1)

dwursteisen
dwursteisen

Reputation: 11515

When your Android device is in sleep mode, don't ask to Rx to skip some events. But stop Rx !

When you subscribe to a stream, you got an handler to unsubscribe to this stream.

Subscription subscription = Observable.timer(1, SECONDS).subscribe();

In the OnPause() method of your activity, you can stop the stream by calling the unsubscribe() method on your subscription.

@Override
public void onPause() {
    subscription.unsubscribe();
}

In the onResume() method of your activity, you can subscribe again on your stream

@Override
public void onResume() {
   subscription = Observable.timer(1, SECONDS).subscribe();
}

Upvotes: 2

Related Questions