Reputation: 1496
I have two infinite observables (getLastNDaysItemsInfinite and listenToServerUpdates) that are chained into one. First one emits items that second one should listen to.
repository
.getLastNDaysItemsInfinite(4)
.flatMap(items ->
Observable
.fromIterable(items)
.map(Item::getId)
.toList()
.flatMapObservable(ids ->
repository
.listenToServerUpdates(ids)
.onErrorResumeNext(throwable -> {
Log.w(TAG, "Error occurred: ", throwable);
return Observable.empty();
}))
);
listenToServerUpdates observable opens socket connection and close it when we unsubscribe form it. So I need to implement behavior where when getLastNDaysItemsInfinite(4) emit new list of items listenToServerUpdates would close its socket connection and open new one. But now it just creating new listenToServerUpdates observable and as consequence new socket connection for every new pack of items from getLastNDaysItemsInfinite in parallel.
How could I resubscribe or cancel and subscribe again to running listenToServerUpdates observable when getLastNDaysItemsInfinite emits new items ?
Thanks in advance!
Upvotes: 1
Views: 1027
Reputation: 515
You could use switchMap
instead of flatMap
.
Like this
repository
.getLastNDaysItemsInfinite(4)
.switchMap(items ->
Observable
.fromIterable(items)....
So each time a new List comes from the getLastNDaysItemsInfinite
the previous created Observables
get unsubscribed. Here are the docs
Upvotes: 2