ericn
ericn

Reputation: 13103

RxJava 2 - how to cancel infinite stream upon error and handle it?

I've got the following infinite stream which does something every second.
What I want is to stop the stream upon error and handle it.
How can I achieve that?

void doSomething() {
        Disposable disposable = execute(doSomethingInner(), 0L, TimeUnit.SECONDS, schedulerProvider.io(), someClass -> 1L).doOnError
                (throwable -> {
            Timber.e(throwable, "error happened");// Never triggered
        })
                .doOnNext(someClass -> Timber.i("doing the infinite stuff"))
                .subscribe(Functions.emptyConsumer(), throwable -> {
                    Timber.e(throwable, "stop doing the infinite stuff");// Never triggered
                });
    }

    Observable<SomeClass> doSomethingInner() {
        return Observable.error(new Exception("something went wrong"));
    }

    Observable<SomeClass> execute(Observable<SomeClass> source,
                                  long delayInterval,
                                  TimeUnit timeUnit,
                                  Scheduler scheduler,
                                  Function<SomeClass, Long> interval) {
        return Observable.defer(new Callable<ObservableSource<SomeClass>>() {
            long currentInterval = delayInterval;

            @Override
            public ObservableSource<SomeClass> call() {
                return Single.timer(currentInterval, timeUnit, scheduler)
                        .flatMapObservable(o -> source)
                        .doOnNext(t -> currentInterval = interval.apply(t));
            }
        })
                .repeat()
                .retry();
    }

Upvotes: 0

Views: 199

Answers (1)

Tuby
Tuby

Reputation: 3253

I think retry() is consuming your error.
Try to either:

  • Remove this retry() completely
  • or change it to retry(Predicate<Throwable>) to decide whether to repeat.

It is default behavior of subscriber to cancel stream on error if you don't consume it earlier, and you should receive callback to onError() inside subscribe().

Upvotes: 2

Related Questions