Mark Hetherington
Mark Hetherington

Reputation: 1691

Replace listeners with RxJava

Currently I am investigating a migration to RxJava and decided that a manager of mine(accountManager) would be an interesting place to start. Currently the Manager has a list of listeners and sends updates accordingly, both when the account gets updated and when something goes wrong.

private List<WeakReference<ProfileChangeListener>> mListeners = new ArrayList<>();

public interface ProfileChangeListener {
    void onProfileUpdated(Account account);
    void onProfileFailed(Exception e);
}

My Rx solution involves a Subject

private SerializedSubject<Account, Account> mManagerSubject = new SerializedSubject<>(BehaviorSubject.<Account>create());

public Observable<Account> observe() {
    return mManagerSubject;
}

and then when an update happens I call one of the following:

private void onProfileUpdated(Account account) {
    mManagerSubject.onNext(account);
}

private void onProfileFailed(final Exception e) {
    mManagerSubject.onError(e);
}

Issue

The Issue is that once onError is called anyone listening via observe will never get another update from onNext.

I still want the subscribers to receive onError so they can handle the error state but at a later time onNext could still be called with an updated account and I still want the subscribers to handle the updated account.

I've tried solutions using onErrorResumeNext, onErrorReturn onExceptionResumeNext but none of them propagate the onError.

TLDR: How do I keep the subscribers subscribed after onError is called while still propagating onError?

Upvotes: 2

Views: 1279

Answers (1)

Bryan Herbst
Bryan Herbst

Reputation: 67189

"Errors" in Rx can be a a little difficult to grasp at first, because they have a slightly different meaning from what most people expect.

From the Error Handling documentation (emphasis mine):

An Observable typically does not throw exceptions. Instead it notifies any observers that an unrecoverable error has occurred by terminating the Observable sequence with an onError notification.

onError() is supposed to be used when an Observable encounters an unrecoverable error- that is when your Observable cannot continue emitting items. When you are subscribing, you might use something like onErrorResumeNext to try some recovery action, but that should be the end of the source Observable.

Instead, you may want to adjust what your Observable emits to support emitting an error item, or include a flag indicating that an error was encountered.

If your error truly is unrecoverable, then you may want to revisit your recovery strategy and try a slightly different approach.

Upvotes: 4

Related Questions