Lance Johnson
Lance Johnson

Reputation: 1276

Nested observables via flatMap outputs duplicates

I have an 'outer' Observable and a cache of 'inner' observables. Which 'inner' observable I select depends on the output of the 'outer' observable. When any of them change I want to combine their results. If 'outer' or 'inner' changes I want to combine the new values. I am trying to do this with flatMap but running into a problem of too many events being triggered (i.e. duplicates).

fun test() {
    val outerObs = Observable.interval(3, TimeUnit.SECONDS)
    val innerObsCache = mapOf(
            0L to Observable.interval(0, 1, TimeUnit.SECONDS).map { "zero:$it" }.share(),
            1L to Observable.interval(0, 1, TimeUnit.SECONDS).map { "one:$it" }.share()
    )

    outerObs
            .flatMap { outerVal ->
                innerObsCache[outerVal % 2]!!.map { "Outer($outerVal):Inner($it)" }
            }.subscribe({ combinedResult -> println("$now $combinedResult") })
}

private val now get() = SimpleDateFormat("HH:mm:ss").format(Date())

This outputs something like

00:17:36 Outer(0):Inner(zero:0)
00:17:37 Outer(0):Inner(zero:1)
00:17:38 Outer(0):Inner(zero:2)
00:17:39 Outer(1):Inner(one:0)
00:17:39 Outer(0):Inner(zero:3)

You will notice that I am getting two outputs at 00:17:39. What I really want to happen is output like this

00:17:36 Outer(0):Inner(zero:0)
00:17:37 Outer(0):Inner(zero:1)
00:17:38 Outer(0):Inner(zero:2)
00:17:39 Outer(1):Inner(one:0)

It seems like my problem is the inner observable at index 0 of my cache Observable map is still vending and thus causing the additional value to be sent. I don't see how to not have that happen given no observables are actually completing when I want to switch to the other one. As time goes on the problem gets worse because each outerObs vend causes even more undesired duplicates.

I am sure there is some other RX technique I should be using here to get what I expect but could use some guidance.

Upvotes: 0

Views: 364

Answers (1)

Lance Johnson
Lance Johnson

Reputation: 1276

Figure it out. I am using the wrong operator given my use case.

switchMap does what I want. It will unsubscribe from the inner observable when the outer observable omits a new value.

outerObs
  .switchMap { outerVal ->
    innerObsCache[outerVal % 2]!!.map { "Outer($outerVal):Inner($it)" }
  }.subscribe({ combinedResult -> println("$now $combinedResult") })

Produces the desired output

00:19:53 Outer(0):Inner(zero:0)
00:19:54 Outer(0):Inner(zero:1)
00:19:55 Outer(0):Inner(zero:2)
00:19:56 Outer(1):Inner(one:0)

What is the difference between flatmap and switchmap in RxJava?

Upvotes: 1

Related Questions