Reputation: 321
Using RxJava operations, is it possible to "ignore" elements that have been processed in the downstream if the upstream emits new items?
for example
Observable.create(...)
.flatMap(...) // 30 seconds to process (Asynchronous)
.flatMap(...) // 30 seconds to process (Asynchronous)
etc...
what I need to achieve is if the upstream notify about the new item it will cancel any operation on the stream below that is currently running even if the down stream doing something in other thread asynchronous.
Upvotes: 0
Views: 39
Reputation: 8227
Instead of flatMap()
operator, you can use switchMap()
. When a new value is received in switchMap()
, the old observable is unsubscribed and a replacement observable is subscribed.
Observable.create( ... )
.switchMap( value -> getObservable1( value )
.switchMap( value2 -> getObservable2( value2 ) )
...
If you want observers further downstream to also be cancelled, you will have to propagate the switchMap()
. In the code above, an emission in the first stage of the observer chain will unsubscribe from getObservable1( value )
and from getObservable2( value2 )
.
Upvotes: 1