Reputation: 1939
I have an Observable.interval
that emits an item every 10 seconds. Inside my subscriber I have a synchronous operation that some times last more than 10 seconds. The result is that my observable emits every 10 seconds so when the synchronous operation ends the subscriber gets called many times in a row.
Here is what I have:
Observable.interval(0, 10, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
aSynchronousOperation(); // Last more than 10 seconds
}
});
What I want to do is maybe "flush" the observable after completing the synchronous operation.
How do I do that?
Upvotes: 2
Views: 450
Reputation: 16142
The canonical operator for this case is onBackPressureDrop
:
Observable
.interval(0, 10, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.onBackPressureDrop()
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
aSynchronousOperation(); // Last more than 10 seconds
}
});
Upvotes: 1
Reputation: 10267
The problem is that observeOn
honors backpressure and will buffer emissions from interval, and will emit when subscriber finish its work.
Actually you can theoretically get MissingBackpressureException
as the default buffer can get overflowed (theoretically because your interval of 10 sec is quite long).
What you want here is to drop items whenever you're still processing, this can be achieved using simple flag, and filter
, the will prevent any emissions while your work is in progress.
Note that in the case your'e mention, when the work will take more than 10 sec, you'll not get the notification for the interval
after the work is done, but just after another 10 sec. you can easily change it by implementing different strategy that let the filter pass single value after work is in progress flag raised, and just then block additional emissions.
AtomicBoolean workInProgress = new AtomicBoolean(false);
Observable.interval(10, TimeUnit.SECONDS)
.filter(aLong -> !workInProgress.get())
.observeOn(Schedulers.io())
.subscribe(aLong -> {
workInProgress.set(true);
aSynchronousOperation();
workInProgress.set(false);
});
Upvotes: 1