Reputation: 371
How to concatenate two flux with a nested one? Why this code execution never ends?
@Test
fun `concatenating two flux`() {
val names = listOf("israel", "israel")
val a = Flux.just("a", "v")
.flatMap { it.toUpperCase().toMono() }
.concatWith { names.joinToString(" ").toMono() }
StepVerifier.create(a).expectNext("A", "V", "israel israel").verifyComplete()
}
when I have the flux with separated variable, the execution runs as expected
@Test
fun `concatenating two flux`() {
val names = listOf("israel", "israel")
val b = names.joinToString(" ").toMono()
val a = Flux.just("a", "v")
.flatMap { it.toUpperCase().toMono() }
val c = a.concatWith(b)
StepVerifier.create(c.log()).expectNext("A", "V", "israel israel").verifyComplete()
}
Upvotes: 2
Views: 583
Reputation: 628
You need to use ()
intead of {}
in concatWith()
// RIGHT!
Flux.just("a", "v")
.flatMap { it.toUpperCase().toMono() }
.concatWith ( names.joinToString(" ").toMono() )
// WRONG!
Flux.just("a", "v")
.flatMap { it.toUpperCase().toMono() }
.concatWith { names.joinToString(" ").toMono() }
Most of Rx2 methods do accept actual lambdas, some of methods take Callable<ObservableSource<T>>
instead of ObservableSource
, others take Function<T, ObservableSource<R>>
.
Observable.defer { Observable.just(1) }
— this will work fine.
Or
observable.flatMap { Observable.just(1) }
— will work as expected too (if you’re ignoring incoming param purposely).
And the third one is the fact that we got used to Rx1 which always took Observable
in its andThen()
method, which couldn’t be represented as lambda , which is why we need to use ()
instead of {}
Upvotes: 3