ant2009
ant2009

Reputation: 22666

subscribing to an observable in the chain and setting the observer to the observable

kotlin 1.2.60
rxJava 2

I have the following code snippet below. I am wondering what is the difference in using the subscribe that is chained to the end of flatmap and printing the results or creating an observer and subscribing to the Observable.

For some reason I was expecting to get the same results. But when I print out the result in the onNext it displays the complete emitted item.

However, for the chained on subscribe it displays what I would expect.

fun main(args: Array<String>) {
    val source2 = Observable.just("521934/2342/FOXTROT", "21962/12112/78886/TANGO", "283242/4542/WHISKEY/2348562")

    source2.flatMap {
        Observable.fromArray(*it.split("/").toTypedArray())
    }
    .subscribe { println(it) }

    val observer = object : Observer<String> {
        override fun onComplete() {
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onNext(t: String) {
            println(t)
        }

        override fun onError(e: Throwable) {
        }
    }

    source2.subscribe(observer)
}

The output is below: from the chained subscribe:

521934
2342
FOXTROT
21962
12112
78886
TANGO
283242
4542
WHISKEY
2348562

from onNext:

521934/2342/FOXTROT
21962/12112/78886/TANGO
283242/4542/WHISKEY/2348562

Upvotes: 1

Views: 125

Answers (1)

CPerson
CPerson

Reputation: 1222

source2 is an immutable collection. You are observing it twice. You should capture the result of the flatmap in a new variable and then observe that.

fun main(args: Array<String>) {
    val source2 = Observable.just("521934/2342/FOXTROT", "21962/12112/78886/TANGO", "283242/4542/WHISKEY/2348562")

    val source3 = source2.flatMap {
        Observable.fromArray(*it.split("/").toTypedArray())
    }
    source3.subscribe { println(it) }

    val observer = object : Observer<String> {
        override fun onComplete() {
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onNext(t: String) {
            println(t)
        }

        override fun onError(e: Throwable) {
        }
    }

    source3.subscribe(observer)
}

Upvotes: 3

Related Questions