Nicolas Barbé
Nicolas Barbé

Reputation: 624

How to subscribe to inner Flux/Mono automatically?

I have a Flux of (bounded) Flux that I want to transform into a Flux of Long, where the Long is the size of the inner Flux:

   Flux.just( Flux.just(1, 2, 3),  Flux.just(1, 2)  )
       .map(Flux::count)
       .log()
       .subscribe();

The execution log is the following:

onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
request(unbounded)
onNext({ "operator" : "Count" })
onNext({ "operator" : "Count" })
onComplete()

Flux::count returns a Mono, not a Long. Is there any operators to unpack this inner mono automatically when subscribing to the main flux?

Upvotes: 1

Views: 4740

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121202

flatMap() is there for you:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

https://projectreactor.io/docs/core/snapshot/api/reactor/core/publisher/Flux.html#flatMap

Upvotes: 1

Related Questions