Olexii Muraviov
Olexii Muraviov

Reputation: 1496

RxJava 2 force complete chain with infinite observable

I have an observable chain that has infinite observable on the top and non infinite observables after it. Like this:

repo.infinitGetItems()
      .switchMap(items -> Observable
                          .just(items)
                          .flatMap(items -> repo.nonInfinitObs(items)));

What I want is to complete all chain when repo.nonInfinitObs sends onComplete event. Now it's not completing because repo.infinitGetItems() is still alive.

Could I do something like force complete for the whole chain in rxJava2?

Upvotes: 0

Views: 1455

Answers (1)

akarnokd
akarnokd

Reputation: 69997

You can stop the main sequence via takeUntil and some flow-external means:

PublishSubject<Integer> stop = PublishSubject.create();

repo.infinitGetItems()
  .takeUntil(stop)
  .switchMap(items -> repo.nonInfinitObs(items)
                      .doOnComplete(() -> stop.onComplete())
  );

Upvotes: 2

Related Questions