Reputation: 2053
I am using
Flowable.combineLatest(query,params)
for listening query change and some param change
but now I want to introduce pagination and listen to offset changes , but the problem here is when query is changing I need to reset offset.
Wondering how can I achieve it with RxAndroid?
Basically we observe 3 or more objects (query, offset, connectionChange)
what I want to achieve is listen changes of any of the observables + when query is changed update the value for offset
Upvotes: 3
Views: 868
Reputation: 70017
To see where the value came from, you have to tag the values in some fashion, which requires per-source transformation. For example:
data class Tuple<T>(val value: T, val index: Long) { ... }
Flowable.defer {
var indices: Array<Long>(4) { 0 }
var latests: Array<Long>(4) { 0 }
Flowable.combineLatest(
source1.map { Tuple(it, indices[0]++) },
source2.map { Tuple(it, indices[1]++) },
source3.map { Tuple(it, indices[2]++) },
source4.map { Tuple(it, indices[3]++) },
{ tuple1, tuple2, tuple3, tuple4 ->
if (tuple1.index != latests[0]) {
// first source changed
}
if (tuple2.index != latests[1]) {
// second source changed
}
if (tuple3.index != latests[2]) {
// third source changed
}
if (tuple4.index != latests[3]) {
// fourth source changed
}
latests[0] = tuple1.index
latests[1] = tuple2.index
latests[2] = tuple3.index
latests[3] = tuple4.index
}
)
}
Upvotes: 1
Reputation: 50578
You can explicitly chain another call just after item has been emitted to the query
flowable. Registering doAfterNext
before Flowable
is being register is probably easiest approach.
fun observePublishers(query: Flowable<List<String>>, connectionState: Flowable<Boolean>, offset: Flowable<Int>) {
val newQuery = query.doAfterNext {
index = 0
}
Flowable.combineLatest(newQuery, connectionState, offset) { queryResult, hasConnection, offsetValue ->
}.subscribe {
}
}
Upvotes: 0
Reputation: 219
Maybe check something here
https://github.com/kaushikgopal/RxJava-Android-Samples#14-pagination-with-rx-using-subjects
Upvotes: 0