Dan Lee
Dan Lee

Reputation: 25

RxJava return error as onNext and continue stream

so i tried to use onErrorReturn to return the result that i wanted but it will complete the stream afterwards, how do i catch the error return as Next and still continue the stream?

with code below, it wont reach retryWhen when there is error and if i flip it around it wont re-subscribe with retryWhen if there is an error

fun process(): Observable<State> {
    return publishSubject
        .flatMap { intent ->
            actAsRepo(intent) // Might return error
                .map { State(data = it, error = null) }
        }
        .onErrorReturn { State(data = "", error = it) } // catch the error
        .retryWhen { errorObs -> 
            errorObs.flatMap {
                Observable.just(State.defaultState()) // continue subscribing
            }
        }
}

private fun actAsRepo(string: String): Observable<String> {
    if (string.contains('A')) {
        throw IllegalArgumentException("Contains A")
    } else {
        return Observable.just("Wrapped from repo: $string")
    }
}

subscriber will be

viewModel.process().subscribe(this::render)

Upvotes: 1

Views: 1546

Answers (1)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4002

onError is a terminal operator. If an onError happens, it will be passed along from operator to operator. You could use an onError-operator which catches the onError and provides a fallback.

In your example the onError happens in the inner-stream of the flatMap. The onError will be propagated downstream to the onErrorReturn opreator. If you look at the implementation, you will see that the onErrorReturn lambda will be invoked, the result will be pushed downstream with onNext following a onComplete

    @Override
    public void onError(Throwable t) {
        T v;
        try {
            v = valueSupplier.apply(t);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            downstream.onError(new CompositeException(t, e));
            return;
        }

        if (v == null) {
            NullPointerException e = new NullPointerException("The supplied value is null");
            e.initCause(t);
            downstream.onError(e);
            return;
        }

        downstream.onNext(v); // <--------
        downstream.onComplete(); // <--------
    }

What is the result of your solution?

Your stream completes because of: #retryWhen JavaDoc

If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner {@code ObservableSource} signals {@code onError} or {@code onComplete} while the upstream is active, the sequence is terminated with the same signal immediately.

What you ought to do:

Place the onErrorReturn behind the map opreator in the flatMap. With this ordering your stream will not complete, when the inner-flatMap stream onErrors.

Why is this?

The flatMap operator completes, when the outer (source: publishSubject) and the inner stream (subscription) both complete. In this case the outer stream (publishSubject) emits onNext and the inner-stream will complete after sending { State(data = "", error = it) } via onNext. Therefore the stream will remain open.

interface ApiCall {
    fun call(s: String): Observable<String>
}

class ApiCallImpl : ApiCall {
    override fun call(s: String): Observable<String> {
        // important: warp call into observable, that the exception is caught and emitted as onError downstream
        return Observable.fromCallable {
            if (s.contains('A')) {
                throw IllegalArgumentException("Contains A")
            } else {
                s
            }
        }
    }
}

data class State(val data: String, val err: Throwable? = null)

apiCallImpl.call will return an lazy observable, which will throw an error on subscription, not at observable assembly time.

// no need for retryWhen here, except you want to catch onComplete from the publishSubject, but once the publishSubject completes no re-subscription will help you, because the publish-subject is terminated and onNext invocations will not be accepted anymore (see implementation). 
fun process(): Observable<State> {
    return publishSubject
        .flatMap { intent ->
            apiCallImpl.call(intent) // Might return error
                .map { State(data = it, err = null) }
                .onErrorReturn { State("", err = it) }
        }
}

Test

lateinit var publishSubject: PublishSubject<String>
lateinit var apiCallImpl: ApiCallImpl

@Before
fun init() {
    publishSubject = PublishSubject.create()
    apiCallImpl = ApiCallImpl()
}

@Test
fun myTest() {
    val test = process().test()

    publishSubject.onNext("test")
    publishSubject.onNext("A")
    publishSubject.onNext("test2")

    test.assertNotComplete()
        .assertNoErrors()
        .assertValueCount(3)
        .assertValueAt(0) {
            assertThat(it).isEqualTo(State("test", null))
            true
        }
        .assertValueAt(1) {
            assertThat(it.data).isEmpty()
            assertThat(it.err).isExactlyInstanceOf(IllegalArgumentException::class.java)
            true
        }
        .assertValueAt(2) {
            assertThat(it).isEqualTo(State("test2", null))
            true
        }
}

Alternative

This alternative behaves a little bit different, than the first solution. The flatMap-Operator takes a boolean (delayError), which will result in swallowing onError messages, until the sources completes. When the source completes, the errors will be emitted.

You may use delayError true, when the exception is of no use and must not be logged at the time of appearance

process

fun process(): Observable<State> {
    return publishSubject
        .flatMap({ intent ->
            apiCallImpl.call(intent)
                .map { State(data = it, err = null) }
        }, true)
}

Test

Only two values are emitted. The error will not be transformed to a fallback value.

@Test
fun myTest() {
    val test = process().test()

    publishSubject.onNext("test")
    publishSubject.onNext("A")
    publishSubject.onNext("test2")

    test.assertNotComplete()
        .assertNoErrors()
        .assertValueAt(0) {
            assertThat(it).isEqualTo(State("test", null))
            true
        }
        .assertValueAt(1) {
            assertThat(it).isEqualTo(State("test2", null))
            true
        }
        .assertValueCount(2)
}

NOTE: I think you want to use switchMap in this case, instead of flatMap.

Upvotes: 4

Related Questions