I.S
I.S

Reputation: 2053

How can I know which observable has changed in combineLatest in RxAndroid?

I am using Flowable.combineLatest(query,params) for listening query change and some param change but now I want to introduce pagination and listen to offset changes , but the problem here is when query is changing I need to reset offset. Wondering how can I achieve it with RxAndroid? Basically we observe 3 or more objects (query, offset, connectionChange) what I want to achieve is listen changes of any of the observables + when query is changed update the value for offset

Upvotes: 3

Views: 868

Answers (3)

akarnokd
akarnokd

Reputation: 70017

To see where the value came from, you have to tag the values in some fashion, which requires per-source transformation. For example:

data class Tuple<T>(val value: T, val index: Long) { ... }

Flowable.defer {
    var indices: Array<Long>(4) { 0 }
    var latests: Array<Long>(4) { 0 }
    Flowable.combineLatest(
        source1.map { Tuple(it, indices[0]++) },
        source2.map { Tuple(it, indices[1]++) },
        source3.map { Tuple(it, indices[2]++) },
        source4.map { Tuple(it, indices[3]++) },
        { tuple1, tuple2, tuple3, tuple4 -> 
            
             if (tuple1.index != latests[0]) {
                 // first source changed
             }
             if (tuple2.index != latests[1]) {
                 // second source changed
             }
             if (tuple3.index != latests[2]) {
                 // third source changed
             }
             if (tuple4.index != latests[3]) {
                 // fourth source changed
             }

             latests[0] = tuple1.index
             latests[1] = tuple2.index
             latests[2] = tuple3.index
             latests[3] = tuple4.index
        }
    )
}

Upvotes: 1

Nikola Despotoski
Nikola Despotoski

Reputation: 50578

You can explicitly chain another call just after item has been emitted to the query flowable. Registering doAfterNext before Flowable is being register is probably easiest approach.

fun observePublishers(query: Flowable<List<String>>, connectionState: Flowable<Boolean>, offset: Flowable<Int>) {
    val newQuery = query.doAfterNext {
        index = 0
    }
    Flowable.combineLatest(newQuery, connectionState, offset) { queryResult, hasConnection, offsetValue ->
        
    }.subscribe { 
        
    }
}

Upvotes: 0

Related Questions