Reputation: 22486
RxJava2
kotlin
This works ok and I can concat 2 observables
Observable.concat(countries(), animals())
.subscribeBy {
println(it)
}
This the sample I can't understand as it use a lambda that seems to take a ObservableSource and I want to concat the 2 observables, but it results in a null exception. Just wondering what I am doing wrong with this. And what is the purpose to use a lambda with the concat?
Observable.concat<String> {
it.onNext(countries())
it.onNext(animals())
}.subscribeBy {
println(it)
}
private fun animals(): Observable<String> =
Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")
private fun countries(): Observable<String> =
Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")
This is the crash I am getting:
Exception in thread "main" java.lang.NullPointerException
at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)
This is the interface for the ObservableSource
I think I am referring to.
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
Many thanks for any suggestions
Upvotes: 4
Views: 647
Reputation: 20119
Passing a lambda to concat resolves to concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
. Because ObservableSource
is an interface with a single non-default method, this triggers Kotlin's SAM conversion. This is why it picks that overload - it's the only one with an interface that can be fulfilled with SAM conversion.
Thus, the lambda is an implementation of the ObservableSource.subscribe(Observer<? super T> observer)
method. That method is documented as:
Subscribes the given Observer to this ObservableSource instance.
Thus, the lambda needs to subscribe the parameter (it
) to the source of Observables
. You're getting a NullPointerException
because rather than subscribing it, you begin calling onNext
on an Observer
that's not yet subscribed, and thus has the incorrect internal state (in this case, there's a queue that's not set up yet, but that's not especially important).
To fulfill the method's contract, you would simply create an Observable
that emits Observable
s and subscribe it
(the Observer
) to that Observable
in the lambda, like so:
Observable.concat<String> {
Observable.just(countries(), animals()).subscribe(it)
}.subscribeBy {
println(it)
}
I've tested this locally and it produces the expected result.
Upvotes: 5