Clyde Barrow
Clyde Barrow

Reputation: 2102

How to flatten Flux<List<T>> to Flux<T>?

Hi my code looks like this:

fun mapBatch(batch: List<String>): Mono<List<MyClass>> ...

fun myFun(stream: Flux<String>): Flux<MyClass> {
    return stream
            .bufferTimeout(50, Duration.ofSeconds(60L))
            .flatMap{ batch -> mapBatch(batch) }
            /// now here I would like to get Flux<MyClass> but have Flux<List<MyClass>> 
}

How to get the Flux<T> from Flux<List<T>> ?

Upvotes: 7

Views: 10296

Answers (2)

Daniel Sobrado
Daniel Sobrado

Reputation: 747

You can flatten it with flatMap(Flux::fromIterable):

fun mapBatch(batch: List<String>): Mono<List<MyClass>> ...

fun myFun(stream: Flux<String>): Flux<MyClass> {
    return stream
            .bufferTimeout(50, Duration.ofSeconds(60L))
            .flatMap{ batch -> mapBatch(batch) }
            .flatMap(Flux::fromIterable) 
}

Upvotes: 1

Gabriel Suaki
Gabriel Suaki

Reputation: 351

You should use .concatMapIterable or .flatMapIterable.

Flux#flatMapIterable is a special operator to "flatten" the item represented as Iterable into a reactive stream of T.

Upvotes: 11

Related Questions