jabley
jabley

Reputation: 2222

Error handing in RxJava at the boundary between reactive and imperative code

Given an interface:

public interface FastlyRxApi {

    @GET("/service/{service_id}/version/{version}/backend")
    Observable<List<Backend>> listBackends(@Path("service_id") String serviceId, @Path("version") String versionId);

    @PUT("/service/{service_id}/version/{version}/backend/{old_name}")
    Observable<Backend> updateBackend(@Path("service_id") String serviceId, @Path("version") String version, @Path("old_name") String oldName, @Body Backend updatedBacked);

}

and some client code:

Integer expectedFirstByteTimeout = 10000; 

// Use a final array to capture any problem found within our composed Observables
final FastlyEnvException[] t = new FastlyEnvException[1];

fastlyRxApi.listBackends(serviceId, newVersion)
    .flatMap(Observable::fromIterable)
    .filter(backend -> !expectedFirstByteTimeout.equals(backend.getFirstByteTimeout()))
    .flatMap(backend -> {
        backend.setFirstByteTimeout(expectedFirstByteTimeout);
        return fastlyRxApi.updateBackend(serviceId, newVersion, backend.getName(), backend);
    }).subscribe(ignore -> {
}, e -> {
    t[0] = new FastlyEnvException("failed to configure backends", e);
});

if (t[0] != null) {
    throw t[0];
}

Using a final array of FastlyEnvException to capture context for error handling feels like I'm doing something wrong, and missing some aspect.

Am I using a hammer rather than a screwdriver here; ie should I be using RxJava for this? It seems to give me a nice readable flow, apart from the error handling. What is the preferred idiom for doing this?

Upvotes: 0

Views: 284

Answers (2)

Yaroslav Stavnichiy
Yaroslav Stavnichiy

Reputation: 21446

Just omit error handler in your subscribe() and the error will be rethrown automatically.

Upvotes: 0

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

Use onErrorResumeNext:

.onErrorResumeNext(err -> 
     Observable.error(new FastlyEnvException("failed to configure backends", e)))
.toBlocking();
.subscribe();

Of note here is the .toBlocking(), this will make the Observable chain wait until it's complete.

Given that the subscribe() doesn't have an error handler, it will re-throw the exception.

Upvotes: 1

Related Questions