Reputation: 303
I am new on rxjava, i want to execute a polling task every 2 seconds for 50 times, also it may terminate if some condition meet in the task, i am trying to use Observable.interval
but i found there is no way to terminate it except for throwing exception, is there any other operator to meet my goal ?
BTW this functionality work as API to provide observable object so i can not control the subscriber and termination by unscribe.
Observable.interval(timeout, interval, TimeUnit.SECONDS)
.flatmap(task - > task)
Upvotes: 14
Views: 13009
Reputation: 737
The code below will produce the following output:
Starting interval..
Next tick is: 1
Next tick is: 2
Next tick is: 3
Next tick is: 4
Next tick is: 5
Interval complete
If you uncomment this line: "// if( t==3 ) dispose();" it will only print the following:
Starting interval..
Next tick is: 1
Next tick is: 2
Next tick is: 3
the code:
Observable.interval(1, TimeUnit.SECONDS).take(5).subscribeWith(new DisposableObserver<Long>() {
@Override public void onStart() {
// do whatever might be needed here to run once before
// the interval begins
System.out.println("Starting interval..");
}
@Override public void onNext(Long t) {
System.out.println("Next tick is: "+t);
// if( t==3 ) dispose();
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
// will be called once when all 5 ticks are completed
// will NOT be called if dispose() has been called
System.out.println("Interval complete");
}
});
Upvotes: 1
Reputation: 6132
You can stop an Observable.interval
using takeUntil
as follows:
Observable.interval(0, 1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.takeUntil(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return aLong == 10;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "Tick: " + aLong);
}
});
In this example, after 10 seconds, observable will stop.
Upvotes: 5
Reputation: 1960
I guess Observable.takeUntil(stopPredicate)
or Observable.takeWhile(predicate)
can help you:
Observable.interval(timeout, interval, TimeUnit.SECONDS)
.takeWhile(val -> val < 42)
Here observable will terminate on 42th attempt
Upvotes: 28