Reputation: 1276
I have an 'outer' Observable and a cache of 'inner' observables. Which 'inner' observable I select depends on the output of the 'outer' observable. When any of them change I want to combine their results. If 'outer' or 'inner' changes I want to combine the new values. I am trying to do this with flatMap but running into a problem of too many events being triggered (i.e. duplicates).
fun test() {
val outerObs = Observable.interval(3, TimeUnit.SECONDS)
val innerObsCache = mapOf(
0L to Observable.interval(0, 1, TimeUnit.SECONDS).map { "zero:$it" }.share(),
1L to Observable.interval(0, 1, TimeUnit.SECONDS).map { "one:$it" }.share()
)
outerObs
.flatMap { outerVal ->
innerObsCache[outerVal % 2]!!.map { "Outer($outerVal):Inner($it)" }
}.subscribe({ combinedResult -> println("$now $combinedResult") })
}
private val now get() = SimpleDateFormat("HH:mm:ss").format(Date())
This outputs something like
00:17:36 Outer(0):Inner(zero:0)
00:17:37 Outer(0):Inner(zero:1)
00:17:38 Outer(0):Inner(zero:2)
00:17:39 Outer(1):Inner(one:0)
00:17:39 Outer(0):Inner(zero:3)
You will notice that I am getting two outputs at 00:17:39
. What I really want to happen is output like this
00:17:36 Outer(0):Inner(zero:0)
00:17:37 Outer(0):Inner(zero:1)
00:17:38 Outer(0):Inner(zero:2)
00:17:39 Outer(1):Inner(one:0)
It seems like my problem is the inner observable at index 0 of my cache Observable map is still vending and thus causing the additional value to be sent. I don't see how to not have that happen given no observables are actually completing when I want to switch to the other one. As time goes on the problem gets worse because each outerObs vend causes even more undesired duplicates.
I am sure there is some other RX technique I should be using here to get what I expect but could use some guidance.
Upvotes: 0
Views: 364
Reputation: 1276
Figure it out. I am using the wrong operator given my use case.
switchMap
does what I want. It will unsubscribe from the inner observable when the outer observable omits a new value.
outerObs
.switchMap { outerVal ->
innerObsCache[outerVal % 2]!!.map { "Outer($outerVal):Inner($it)" }
}.subscribe({ combinedResult -> println("$now $combinedResult") })
Produces the desired output
00:19:53 Outer(0):Inner(zero:0)
00:19:54 Outer(0):Inner(zero:1)
00:19:55 Outer(0):Inner(zero:2)
00:19:56 Outer(1):Inner(one:0)
What is the difference between flatmap and switchmap in RxJava?
Upvotes: 1