woot
woot

Reputation: 3731

RxJava subjects and error handling

I am trying to achieve a behavior similar to that of an event bus. For my requirements, a PublishSubject seems suitable.

The subject emits items representing a result of some global operation, which might resolve successfully or fail in case of an exception. I can't use onNext() for success events and onError() with the Throwable in case of an error, since once onError() is invoked the subject terminates and any future subscribers will get no emissions apart from an onError() one.

Right now the way I see it I have to create a class representing the event, and optionally referencing a Throwable in case of an error. This however seems unwise, as one would have to handle errors inside onNext().

How would you go about it?

Upvotes: 3

Views: 1669

Answers (1)

krp
krp

Reputation: 2247

Creating a generic class wrapping events is a way to go. Say we call it ResponseOrError class, it should basically contain two fields

private T data;
private Throwable error;

and two simple factory methods :

public static <T> ResponseOrError<T> fromError(Throwable throwable) {
    return new ResponseOrError<>(throwable);
}

public static <T> ResponseOrError<T> fromData(T data) {
    return new ResponseOrError<>(data);
}

to remove some boilerplate code you can provide Transformer to make Observable of ResponseOrError type.

public static <T> Observable.Transformer<T, ResponseOrError<T>> toResponseOrErrorObservable() {
    return new Observable.Transformer<T, ResponseOrError<T>>() {

        @Override
        public Observable<ResponseOrError<T>> call(final Observable<T> observable) {
            return observable
                    .map(new Func1<T, ResponseOrError<T>>() {
                        @Override
                        public ResponseOrError<T> call(final T t) {
                            return ResponseOrError.fromData(t);
                        }
                    })
                    .onErrorResumeNext(new Func1<Throwable, Observable<? extends ResponseOrError<T>>>() {
                        @Override
                        public Observable<? extends ResponseOrError<T>> call(final Throwable throwable) {
                            return Observable.just(ResponseOrError.<T>fromError(throwable));
                        }
                    });
        }
    };
}

then you can use it like that :

final Observable<ResponseOrError<ImportantData>> compose = mNetworkService
               .getImportantData()
               .compose(ResponseOrError.<ImportantData>toResponseOrErrorObservable());

and now you can easily map result depending on success or failure or even provide another Transformer returning mapped Observable< T> instead of Observable< ResponseOrError< T>>

Upvotes: 4

Related Questions