Ofir Prizat
Ofir Prizat

Reputation: 154

In RxJava, how to reflect/extract a failure outside of an Observable?

We have a StoreService that calls an update(key, content) method which is using the couchbase client to do get-->change_content-->replace.

As part of that process we are using the Observable retryWhen to handle Exceptions. In case the retry exceeds the maximum retries it just passes the Exception along, which then triggers the observer's onError method.

What we would like to do in case the Error cannot be handled is to throw an Exception from the update(key, content) method to the StoreService which calls it, but we have failed to do so.

We have tried the following ways without success so far:

  1. Throw an Exception from the onError method but it doesn't get thrown out of the Observable.
  2. Throw a RuntimeException but it didn't work as well.
  3. Using a DTO that has a boolean isFailed member in it: we create the DTO outside of the Observable and in case of an error we get to the subscriber's onError where we set the DTO's isFailed to true. After the Observable finishes we check if the DTO isFailed and if so we throw an exception. That didn't work either - the change that happened in the onError didn't propagate outside of the Observable (why?)

Here is the pseudo code:

 public void update(String key, ConversationDto updateConversationDto) {

    ObservableExecution observableExecution = new ObservableExecution();

    Observable
            .defer(... get the document from couchbase ...) 
            .map(... handle JSON conversion and update the document ...)
            .flatMap(documentUpdate -> {
                return couchbaseClient.getAsyncBucket().replace(documentUpdate);
            })
            .retryWhen(new RetryWithDelay(3, 200))
            .subscribe(
                    n -> logger.debug("on next update document -> " + n.content()),
                    e -> {
                        //logger.error("failed to insert a document",e);
                        observableExecution.setFailure(e);
                    },
                    () -> logger.debug("on complete update document")

            );
    // this is never true
    if (observableExecution.isFailed()) {
        final Throwable e = observableExecution.getFailure();
        throw new DalRuntimeException(e.getMessage(), e);
    }
}

This is the retryWhen code:

public Observable<?> call(Observable<? extends Throwable> attempts) {
    return attempts
            .flatMap(new Func1<Throwable, Observable<?>>() {
                @Override
                public Observable<?> call(Throwable errorNotification) {
                    if (++retryCount < maxRetries) {
                        // When this Observable calls onNext, the original
                        // Observable will be retried (i.e. re-subscribed).
                        logger.debug(errorNotification + " retry no. " + retryCount);
                        return Observable.timer(retryDelayMillis,
                                TimeUnit.MILLISECONDS);
                    }

                    // Max retries hit. Just pass the error along.
                    logger.debug(errorNotification + " exceeded max retries " + maxRetries);
                    return Observable.error(errorNotification);
                }
            });
}

Appreciate your help very much!

Upvotes: 0

Views: 928

Answers (2)

Yura Lazarev
Yura Lazarev

Reputation: 76

I agree with @Ross: conceptually Observable should be returned by update(). The only simplification I can propose would be to use local mutable variable instead of ObservableExecution DTO:

public void update(String key, ConversationDto updateConversationDto) {
    final Throwable[] errorHolder = new Throwable[1];

    Observable
        .defer(... get the document from couchbase ...) 
        .map(... handle JSON conversion and update the document ...)
        .flatMap(documentUpdate -> {
            return couchbaseClient.getAsyncBucket().replace(documentUpdate);
        })
        .retryWhen(new RetryWithDelay(3, 200))
        .subscribe(
                n -> logger.debug("on next update document -> " + n.content()),
                e -> {
                    //logger.error("failed to insert a document",e);
                    errorHolder[0] = e;
                },
                () -> logger.debug("on complete update document")

        );

    if (errorHolder[0] != null) {
        final Throwable e = errorHolder[0];
        throw new DalRuntimeException(e.getMessage(), e);
    }
}

Upvotes: 0

Ross Hambrick
Ross Hambrick

Reputation: 5940

The subscription runs async so the isFailed() check will always run immediately and before the e -> setFailure(e) code runs.

The proper way to do this is to return the Observable from the update() method and subscribe to it in the StoreService. That way you are notified of successes and failures where you are interested in handling them.

Upvotes: 1

Related Questions