Reputation: 199
I am migrating my project from Spring to Ktor, and decided to replace the implementation of reactive streams which was initially Reactor to RxJava 2. Although I faced some problem when trying to combine multiple streams into single one at the end of the reactive pipeline. Here it is how it looks like:
internal interface Aggregator {
fun acquireSomethingFromSomewhere(keyword: String): Flowable<Some>
}
fun acquireSomething(keyword: String) = Flowable
.fromIterable(aggregators)
.map { it.acquireSomethingFromSomewhere(keyword) }
.flatMap { ??? }
The thing is, each call of acquireSomethingFromSomewhere
returns Flowable<Some>
, is there any operator which could help me combine them in the one stream at the end? In Reactor I just used:
fun acquireSomething(keyword: String) = Flux
.fromIterable(aggregators)
.map { it.acquireSomethingFromSomewhere(keyword) }
.flatMap { Flux.concat(it) }
But in RxJava I can't find any operator which could solve my problem, as each of them takes Publisher
as an argument, and Flowable
does not implement it.
Upvotes: 2
Views: 1502
Reputation: 4184
First of all, if the function you provide in map
returns a Flowable
, you will end up with nested Flowables (a.k.a. Flowable<Flowable<T>>
), which is not what you probably want. This is because the map
function only transform the element inside the container (T) -> R
(in this case the container is Flowable
). In your case, you want to transform the element inside the first container returning a new container (T) -> Flowable<R>
, this function is called flatMap
. In the case of Rx you have more functions (operators) depending on their behavior, like concatMap
and switchMap
, but the signatures are the same.
Example
fun acquireSomething(keyword: String) = Flowable
.fromIterable(aggregators)
.flatMap { it.acquireSomethingFromSomewhere(keyword) }
PS
If you want to know more about the theory behind, you can follow Arrow-kt documentation of Functor and Monad
Upvotes: 1