Ido Kahana
Ido Kahana

Reputation: 321

RxJava Asynchronous debouncing

Using RxJava operations, is it possible to "ignore" elements that have been processed in the downstream if the upstream emits new items?

for example

Observable.create(...)
  .flatMap(...) // 30 seconds to process (Asynchronous)
  .flatMap(...) // 30 seconds to process (Asynchronous)
  etc...

what I need to achieve is if the upstream notify about the new item it will cancel any operation on the stream below that is currently running even if the down stream doing something in other thread asynchronous.

Upvotes: 0

Views: 39

Answers (1)

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

Instead of flatMap() operator, you can use switchMap(). When a new value is received in switchMap(), the old observable is unsubscribed and a replacement observable is subscribed.

Observable.create( ... )
  .switchMap( value -> getObservable1( value )
                         .switchMap( value2 -> getObservable2( value2 ) )
  ...

If you want observers further downstream to also be cancelled, you will have to propagate the switchMap(). In the code above, an emission in the first stage of the observer chain will unsubscribe from getObservable1( value ) and from getObservable2( value2 ).

Upvotes: 1

Related Questions