user2849678
user2849678

Reputation: 633

RxJava - Continue infinite stream

I saw the other similar issue, but it did not solve my problem:

        Observable<Integer> resumeSequence = Observable.just(-1);       
        Observable.just(1,0,2,5)
          .flatMap(x -> 
                      Observable.just(10/x)
                      //.onErrorReturn(error -> -1)
                      .onErrorResumeNext(resumeSequence)
                      .onExceptionResumeNext(resumeSequence)
              )
//        .onErrorReturn(error -> -1)
          .onErrorResumeNext(resumeSequence)
          .onExceptionResumeNext(resumeSequence)
          .subscribe(System.out::println, 
                      throwable -> System.out.println("IN ERROR CALLBACK" + throwable));

Ideally the Resume should be inside the flatMap. The output should print 4 numbers & flow should not go to the Error Callback.

Upvotes: 1

Views: 649

Answers (1)

dwursteisen
dwursteisen

Reputation: 11515

Observable.just is useful for already computed values.

your code bellow

 Observable.just(1,0,2,5)
      .flatMap(x -> 
                  Observable.just(10/x)
                  .onErrorResumeNext(resumeSequence)
                  .onExceptionResumeNext(resumeSequence)
          )

can be rewrited like this :

 Observable.just(1,0,2,5)
      .flatMap(x -> {
                  int result = 10 / x;
                  return Observable.just(result)
                  .onErrorResumeNext(resumeSequence)
                  .onExceptionResumeNext(resumeSequence);
          })

So it explain that your "primary" Observable goes in an error state and complete thanks to onExceptionResumeNext operator, as your nested Observable won't never be used

What you wants to achieve is to not complete the observable then an error occur. To do this, your nested Observable can complete as soon as your primary Observable doesn't. To do this, you'll have to defer the computation of 10 / x using the fromCallable operator:

 Observable<Integer> resumeSequence = Observable.just(-1);       
 Observable.just(1,0,2,5)
      // 10 / x will be computed only when this nested observable will subscribe
      .flatMap(x -> Observable.fromCompletable(() -> 10/x)
                  .onErrorResumeNext(resumeSequence)
                  .onExceptionResumeNext(resumeSequence)
          )
      .onErrorResumeNext(resumeSequence)
      .onExceptionResumeNext(resumeSequence)
      .subscribe(System.out::println, 
                  throwable -> System.out.println("IN ERROR CALLBACK" + throwable));

Upvotes: 2

Related Questions