KairisCharm
KairisCharm

Reputation: 1373

RxJava2 combine Observables of different types

I'm a bit of an RxJava newbie. I'm getting the hang of it, but there's one scenario that I'm not quite satisfied with. Say I have a pair of Observables of different types that are designed to work together. For example:

val list: Observable<List<MyClass>>
val indexIntoList: Observable<Int>

So, I need to observe these as a pair, so that the observer gets emitted to when either list or indexIntoList gets an update. Right now my solution is something like this:

var lastListUsed: List<MyClass> = ArrayList()
var lastIndexUsed = -1
val syncObject = Object()

init {
    list.subscribe(object: Observer<List<MyClass>>{
        .
        .
        .
        override fun onNext(t: List<MyClass>)
        {
             FunctionOnMyClass(t, lastIndexUsed)
        }
    })

    indexIntoList.subscribe(object: Observer<Int>{
        .
        .
        .
        override fun onNext(t: Int)
        {
            FunctionOnMyClass(lastListUsed, t)
        }
    })
}

fun FunctionOnMyClass(list: List<MyClass>, index: Int)
{
    synchronized(syncObject)
    {
        lastListUsed = list
        lastIndexUsed = index
        .
        .
        .
    }
}

I had the thought to do something like this:

var lastMyClass = DefaultMyClass()

list.doOnNext{listVersion ->
    indexIntoMap.map{ index ->
          if(index in listVersion.indices)
              listVersion[index]
          else
              lastMyClass
     }.subscribe(object: Observer<MyClass>{
        .
        .
        .
        override fun onNext(t: MyClass)
        {
            lastMyClass = t
            .
            .
            .
        }
     })
}

But if I'm understanding correctly, to use this method, I would have to dispose the Disposable on the inner Observable every time the outer Observable updates. I'd rather not do that. Is there a preferred way to handle this scenario? Should I be open to disposing the Disposable on every update of the outer Observable?

Upvotes: 1

Views: 3740

Answers (2)

crgarridos
crgarridos

Reputation: 9263

I think you are looking for combineLastest operator:

when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function

enter image description here

Here an example of implementation:

val myListObservable = ...
val myIndexObservable = ...

disposable = Observable.combineLatest(myListObservable, myIndexObservable, BiFunction { list: List<YourType>, index: Int ->
    list to index
})

Then you can work directly with the last list and the las index emitted by each observable as a Pair<List<YourType>,Int>


In the case you need to wait to each observable to emit an element to pair with each other, you should use zip

Combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

enter image description here

As you can observe in the diagram, the second observable emits C and D at last. But the first observable take a time to emit 3 and then 4. The resulting pair will wait until the other observable emits an element to pair with.

The implementation is the same as above, replacing combineLastest with zip.

Upvotes: 4

Andre
Andre

Reputation: 364

if you want to listen to the most recent events from both observables then take a look at combineLatest operator: http://rxmarbles.com/#combineLatest

It emits every time one of the observables emit a new value BUT it will start as soon as each of the sources has emitted at least one event.

val result = Observable.combineLatest<String, Int, String>(sourceObservable1, sourceObservable2) { s, integer -> s + ": " + integer }

Upvotes: 1

Related Questions