TheCrafter
TheCrafter

Reputation: 1939

Stop java rx interval observable from queuing items

I have an Observable.interval that emits an item every 10 seconds. Inside my subscriber I have a synchronous operation that some times last more than 10 seconds. The result is that my observable emits every 10 seconds so when the synchronous operation ends the subscriber gets called many times in a row.

Here is what I have:

Observable.interval(0, 10, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io())
    .subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            aSynchronousOperation(); // Last more than 10 seconds
         }
     });

What I want to do is maybe "flush" the observable after completing the synchronous operation.

How do I do that?

Upvotes: 2

Views: 450

Answers (2)

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

The canonical operator for this case is onBackPressureDrop:

Observable
.interval(0, 10, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.onBackPressureDrop()
.subscribe(new Action1<Long>() {
    @Override
    public void call(Long aLong) {
        aSynchronousOperation(); // Last more than 10 seconds
     }
 });

Upvotes: 1

yosriz
yosriz

Reputation: 10267

The problem is that observeOn honors backpressure and will buffer emissions from interval, and will emit when subscriber finish its work.
Actually you can theoretically get MissingBackpressureException as the default buffer can get overflowed (theoretically because your interval of 10 sec is quite long).
What you want here is to drop items whenever you're still processing, this can be achieved using simple flag, and filter, the will prevent any emissions while your work is in progress.

Note that in the case your'e mention, when the work will take more than 10 sec, you'll not get the notification for the interval after the work is done, but just after another 10 sec. you can easily change it by implementing different strategy that let the filter pass single value after work is in progress flag raised, and just then block additional emissions.

 AtomicBoolean workInProgress = new AtomicBoolean(false);
    Observable.interval(10, TimeUnit.SECONDS)
            .filter(aLong -> !workInProgress.get())
            .observeOn(Schedulers.io())
            .subscribe(aLong -> {
                workInProgress.set(true);
                aSynchronousOperation();
                workInProgress.set(false);
            });

Upvotes: 1

Related Questions