Olexii Muraviov
Olexii Muraviov

Reputation: 1496

RxJava 2: Resubscribe (or Cancel and subscribe again) to second infinite observable in chain

I have two infinite observables (getLastNDaysItemsInfinite and listenToServerUpdates) that are chained into one. First one emits items that second one should listen to.

            repository
            .getLastNDaysItemsInfinite(4)
            .flatMap(items ->
                    Observable
                            .fromIterable(items)
                            .map(Item::getId)
                            .toList()
                            .flatMapObservable(ids ->
                                    repository
                                            .listenToServerUpdates(ids)
                                            .onErrorResumeNext(throwable -> {
                                                Log.w(TAG, "Error occurred: ", throwable);
                                                return Observable.empty();
                                            }))
            );

listenToServerUpdates observable opens socket connection and close it when we unsubscribe form it. So I need to implement behavior where when getLastNDaysItemsInfinite(4) emit new list of items listenToServerUpdates would close its socket connection and open new one. But now it just creating new listenToServerUpdates observable and as consequence new socket connection for every new pack of items from getLastNDaysItemsInfinite in parallel.

How could I resubscribe or cancel and subscribe again to running listenToServerUpdates observable when getLastNDaysItemsInfinite emits new items ?

Thanks in advance!

Upvotes: 1

Views: 1027

Answers (1)

masp
masp

Reputation: 515

You could use switchMap instead of flatMap.

Like this

 repository
            .getLastNDaysItemsInfinite(4)
            .switchMap(items ->
                Observable
                        .fromIterable(items)....

So each time a new List comes from the getLastNDaysItemsInfinite the previous created Observables get unsubscribed. Here are the docs

Upvotes: 2

Related Questions