Reputation: 154
We have a StoreService that calls an update(key, content) method which is using the couchbase client to do get-->change_content-->replace.
As part of that process we are using the Observable retryWhen to handle Exceptions. In case the retry exceeds the maximum retries it just passes the Exception along, which then triggers the observer's onError method.
What we would like to do in case the Error cannot be handled is to throw an Exception from the update(key, content) method to the StoreService which calls it, but we have failed to do so.
public void update(String key, ConversationDto updateConversationDto) {
ObservableExecution observableExecution = new ObservableExecution();
Observable
.defer(... get the document from couchbase ...)
.map(... handle JSON conversion and update the document ...)
.flatMap(documentUpdate -> {
return couchbaseClient.getAsyncBucket().replace(documentUpdate);
})
.retryWhen(new RetryWithDelay(3, 200))
.subscribe(
n -> logger.debug("on next update document -> " + n.content()),
e -> {
//logger.error("failed to insert a document",e);
observableExecution.setFailure(e);
},
() -> logger.debug("on complete update document")
);
// this is never true
if (observableExecution.isFailed()) {
final Throwable e = observableExecution.getFailure();
throw new DalRuntimeException(e.getMessage(), e);
}
}
This is the retryWhen code:
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable errorNotification) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
logger.debug(errorNotification + " retry no. " + retryCount);
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
logger.debug(errorNotification + " exceeded max retries " + maxRetries);
return Observable.error(errorNotification);
}
});
}
Appreciate your help very much!
Upvotes: 0
Views: 928
Reputation: 76
I agree with @Ross: conceptually Observable should be returned by update(). The only simplification I can propose would be to use local mutable variable instead of ObservableExecution DTO:
public void update(String key, ConversationDto updateConversationDto) {
final Throwable[] errorHolder = new Throwable[1];
Observable
.defer(... get the document from couchbase ...)
.map(... handle JSON conversion and update the document ...)
.flatMap(documentUpdate -> {
return couchbaseClient.getAsyncBucket().replace(documentUpdate);
})
.retryWhen(new RetryWithDelay(3, 200))
.subscribe(
n -> logger.debug("on next update document -> " + n.content()),
e -> {
//logger.error("failed to insert a document",e);
errorHolder[0] = e;
},
() -> logger.debug("on complete update document")
);
if (errorHolder[0] != null) {
final Throwable e = errorHolder[0];
throw new DalRuntimeException(e.getMessage(), e);
}
}
Upvotes: 0
Reputation: 5940
The subscription runs async so the isFailed()
check will always run immediately and before the e -> setFailure(e)
code runs.
The proper way to do this is to return the Observable
from the update()
method and subscribe to it in the StoreService
. That way you are notified of successes and failures where you are interested in handling them.
Upvotes: 1