android_user
android_user

Reputation: 91

RxJava: Skip all the errors in fromIterable() and notify subscriber when all the items are emitted - Flowable.parallel execution

I have an API call which verifies some status against an "Id". The API returns Single or error. I have a list of such Id's, Only one Id is valid to return success or none (all id's return error). What I need is, Iterate through each Id and skip the errors from API call, until either a success or end of the list. I am able to achieve this sequentially. However, I am trying to do the same, using ParallelFlowable. It works fine when an Id returns success, But when there is no id which returns success (all ids fail), then it just skip all the errors from API, but does not notify the subscriber after all the ids are validated. I am not sure how to handle this.

// API call
fun getStatus(Id: String): Single<String> {
  //... returns Single<String> or error
}

//Sequential flow, Working
fun getStatus(ids: List<String>): Single<String> {
  Observable.fromIterable(ids)
                .flatMapSingle { id ->
                    getStatus(id)
                        .onErrorResumeWith { singleSource ->
                            if (ids.last() == id)) { //If this is last item in list, return error
                                singleSource.onError(NoStatusFoundException())
                            } else {
                                // Skip errors until valid id is found or till the list reached end.
                                Flowable.empty<String>()
                            }
                        }
                }.firstOrError()
}

// Parallel Flow, How to identify the list is completed and return NoStatusFoundException in case of all id's fail?
fun getStatus(ids: List<String>): Single<String> { 
              Flowable.fromIterable(ids)
                .parallel()
                .runOn(io())
                .flatMap{ id -> getStatus(id).toFlowable()
                          .onErrorResumeWith { Flowable.empty<String>() }
                        }
                .sequentialDelayError()
                .firstOrError()
                .onErrorResumeNext { Single.error(it) }
}

// Subscription
getStatus(listOf("1","2","3","4","5",))
 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscriber({ id->
       // success
       this is notified when an id is success
    },
    { // error handler - Need help here
       Never notified when all the id's fail?
    })

Upvotes: 1

Views: 566

Answers (3)

android_user
android_user

Reputation: 91

I am able to resolve this issue by removing onErrorResumeWith { Flowable.empty<String>() } within flatMap and implementing RxJavaPlugins.setErrorHandler{...}. sequentialDelayError() delays all the errors until all the rails have finished their task.

fun getStatus(ids: List<String>): Single<String> { 
              Flowable.fromIterable(ids)
                .parallel()
                .runOn(io())
                .flatMap{ id -> getStatus(id).toFlowable()
                        }
                .sequentialDelayError()
                .firstOrError()
                .onErrorResumeNext { Single.error(it) }
}

///
RxJavaPlugins.setErrorHandler{ it: Throwable ->
  is UndeliverableException -> {...}
  .....
}

Upvotes: 1

Nitrodon
Nitrodon

Reputation: 3435

The problem is in this line:

.onErrorResumeWith { Flowable.empty<String>() }

The parameter of onErrorResumeWith is a Publisher<out T>, not () -> Publisher<out T>. The Publisher interface happens to have a single method, void subscribe​(Subscriber<? super T> s). As such, it is eligible for SAM conversion.

The lambda { Flowable.empty<String>() } is a perfectly valid (Subscriber<String>) -> Unit that ignores its single parameter, calls a method, and ignores the result. This compiles, but the result is for all practical purposes the same as Flowable.never().

Instead of a lambda, you need to pass Flowable.empty() directly into onErrorResumeNext():

            .flatMap{ id -> getStatus(id).toFlowable()
                      .onErrorResumeWith(Flowable.empty<String>())
                    }

Upvotes: 0

JakeB
JakeB

Reputation: 2103

You are returning Flowable.empty() that immediately completes the subscription. Taken from the docs:

Returns a Flowable that emits no items to the {@link Subscriber} and immediately invokes its {@link Subscriber#onComplete onComplete} method.

Maybe you can return Flowable.just("") or provide some expected argument incase of an error.

.onErrorResumeWith { Flowable.just("") }

Upvotes: 0

Related Questions