ant2009
ant2009

Reputation: 22486

How to use concat with lambda and ObservableSource

RxJava2
kotlin 

This works ok and I can concat 2 observables

   Observable.concat(countries(), animals())
                    .subscribeBy {
                        println(it)
                    }

This the sample I can't understand as it use a lambda that seems to take a ObservableSource and I want to concat the 2 observables, but it results in a null exception. Just wondering what I am doing wrong with this. And what is the purpose to use a lambda with the concat?

        Observable.concat<String> {
                    it.onNext(countries())
                    it.onNext(animals())
                }.subscribeBy {
                    println(it)
                }


    private fun animals(): Observable<String> =
            Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")

    private fun countries(): Observable<String> =
            Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")

This is the crash I am getting:

Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)

This is the interface for the ObservableSource I think I am referring to.

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

Many thanks for any suggestions

Upvotes: 4

Views: 647

Answers (1)

Ryan M
Ryan M

Reputation: 20119

Passing a lambda to concat resolves to concat(ObservableSource<? extends ObservableSource<? extends T>> sources). Because ObservableSource is an interface with a single non-default method, this triggers Kotlin's SAM conversion. This is why it picks that overload - it's the only one with an interface that can be fulfilled with SAM conversion.

Thus, the lambda is an implementation of the ObservableSource.subscribe(Observer<? super T> observer) method. That method is documented as:

Subscribes the given Observer to this ObservableSource instance.

Thus, the lambda needs to subscribe the parameter (it) to the source of Observables. You're getting a NullPointerException because rather than subscribing it, you begin calling onNext on an Observer that's not yet subscribed, and thus has the incorrect internal state (in this case, there's a queue that's not set up yet, but that's not especially important).

To fulfill the method's contract, you would simply create an Observable that emits Observables and subscribe it (the Observer) to that Observable in the lambda, like so:

Observable.concat<String> {
    Observable.just(countries(), animals()).subscribe(it)
}.subscribeBy {
    println(it)
}

I've tested this locally and it produces the expected result.

Upvotes: 5

Related Questions