Reputation: 91
I have an API call which verifies some status against an "Id". The API returns Single or error. I have a list of such Id's, Only one Id is valid to return success or none (all id's return error). What I need is, Iterate through each Id and skip the errors from API call, until either a success or end of the list. I am able to achieve this sequentially. However, I am trying to do the same, using ParallelFlowable. It works fine when an Id returns success, But when there is no id which returns success (all ids fail), then it just skip all the errors from API, but does not notify the subscriber after all the ids are validated. I am not sure how to handle this.
// API call
fun getStatus(Id: String): Single<String> {
//... returns Single<String> or error
}
//Sequential flow, Working
fun getStatus(ids: List<String>): Single<String> {
Observable.fromIterable(ids)
.flatMapSingle { id ->
getStatus(id)
.onErrorResumeWith { singleSource ->
if (ids.last() == id)) { //If this is last item in list, return error
singleSource.onError(NoStatusFoundException())
} else {
// Skip errors until valid id is found or till the list reached end.
Flowable.empty<String>()
}
}
}.firstOrError()
}
// Parallel Flow, How to identify the list is completed and return NoStatusFoundException in case of all id's fail?
fun getStatus(ids: List<String>): Single<String> {
Flowable.fromIterable(ids)
.parallel()
.runOn(io())
.flatMap{ id -> getStatus(id).toFlowable()
.onErrorResumeWith { Flowable.empty<String>() }
}
.sequentialDelayError()
.firstOrError()
.onErrorResumeNext { Single.error(it) }
}
// Subscription
getStatus(listOf("1","2","3","4","5",))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscriber({ id->
// success
this is notified when an id is success
},
{ // error handler - Need help here
Never notified when all the id's fail?
})
Upvotes: 1
Views: 566
Reputation: 91
I am able to resolve this issue by removing onErrorResumeWith { Flowable.empty<String>() }
within flatMap and implementing RxJavaPlugins.setErrorHandler{...}
.
sequentialDelayError()
delays all the errors until all the rails have finished their task.
fun getStatus(ids: List<String>): Single<String> {
Flowable.fromIterable(ids)
.parallel()
.runOn(io())
.flatMap{ id -> getStatus(id).toFlowable()
}
.sequentialDelayError()
.firstOrError()
.onErrorResumeNext { Single.error(it) }
}
///
RxJavaPlugins.setErrorHandler{ it: Throwable ->
is UndeliverableException -> {...}
.....
}
Upvotes: 1
Reputation: 3435
The problem is in this line:
.onErrorResumeWith { Flowable.empty<String>() }
The parameter of onErrorResumeWith
is a Publisher<out T>
, not () -> Publisher<out T>
. The Publisher
interface happens to have a single method, void subscribe(Subscriber<? super T> s)
. As such, it is eligible for SAM conversion.
The lambda { Flowable.empty<String>() }
is a perfectly valid (Subscriber<String>) -> Unit
that ignores its single parameter, calls a method, and ignores the result. This compiles, but the result is for all practical purposes the same as Flowable.never()
.
Instead of a lambda, you need to pass Flowable.empty()
directly into onErrorResumeNext()
:
.flatMap{ id -> getStatus(id).toFlowable()
.onErrorResumeWith(Flowable.empty<String>())
}
Upvotes: 0
Reputation: 2103
You are returning Flowable.empty() that immediately completes the subscription. Taken from the docs:
Returns a Flowable that emits no items to the {@link Subscriber} and immediately invokes its {@link Subscriber#onComplete onComplete} method.
Maybe you can return Flowable.just("")
or provide some expected argument incase of an error.
.onErrorResumeWith { Flowable.just("") }
Upvotes: 0