Reputation: 1373
I'm a bit of an RxJava newbie. I'm getting the hang of it, but there's one scenario that I'm not quite satisfied with. Say I have a pair of Observables of different types that are designed to work together. For example:
val list: Observable<List<MyClass>>
val indexIntoList: Observable<Int>
So, I need to observe these as a pair, so that the observer gets emitted to when either list or indexIntoList gets an update. Right now my solution is something like this:
var lastListUsed: List<MyClass> = ArrayList()
var lastIndexUsed = -1
val syncObject = Object()
init {
list.subscribe(object: Observer<List<MyClass>>{
.
.
.
override fun onNext(t: List<MyClass>)
{
FunctionOnMyClass(t, lastIndexUsed)
}
})
indexIntoList.subscribe(object: Observer<Int>{
.
.
.
override fun onNext(t: Int)
{
FunctionOnMyClass(lastListUsed, t)
}
})
}
fun FunctionOnMyClass(list: List<MyClass>, index: Int)
{
synchronized(syncObject)
{
lastListUsed = list
lastIndexUsed = index
.
.
.
}
}
I had the thought to do something like this:
var lastMyClass = DefaultMyClass()
list.doOnNext{listVersion ->
indexIntoMap.map{ index ->
if(index in listVersion.indices)
listVersion[index]
else
lastMyClass
}.subscribe(object: Observer<MyClass>{
.
.
.
override fun onNext(t: MyClass)
{
lastMyClass = t
.
.
.
}
})
}
But if I'm understanding correctly, to use this method, I would have to dispose the Disposable on the inner Observable every time the outer Observable updates. I'd rather not do that. Is there a preferred way to handle this scenario? Should I be open to disposing the Disposable on every update of the outer Observable?
Upvotes: 1
Views: 3740
Reputation: 9263
I think you are looking for combineLastest
operator:
when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
Here an example of implementation:
val myListObservable = ...
val myIndexObservable = ...
disposable = Observable.combineLatest(myListObservable, myIndexObservable, BiFunction { list: List<YourType>, index: Int ->
list to index
})
Then you can work directly with the last list and the las index emitted by each observable as a Pair<List<YourType>,Int>
In the case you need to wait to each observable to emit an element to pair with each other, you should use zip
Combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
As you can observe in the diagram, the second observable emits C
and D
at last. But the first observable take a time to emit 3
and then 4
. The resulting pair will wait until the other observable emits an element to pair with.
The implementation is the same as above, replacing combineLastest
with zip
.
Upvotes: 4
Reputation: 364
if you want to listen to the most recent events from both observables then take a look at combineLatest
operator: http://rxmarbles.com/#combineLatest
It emits every time one of the observables emit a new value BUT it will start as soon as each of the sources has emitted at least one event.
val result = Observable.combineLatest<String, Int, String>(sourceObservable1, sourceObservable2) { s, integer -> s + ": " + integer }
Upvotes: 1