Jason Carreira
Jason Carreira

Reputation: 489

How to emit from Flux onComplete

I'm trying to implement something similar to Akka Streams statefulMapConcat... Basically I have a Flux of scores something like this:

Score(LocalDate date, Integer score)

I want to take these in and emit one aggregate per day:

ScoreAggregate(LocalDate date, Integer scoreCount, Integer totalScore)

So I've got an aggregator that keeps some internal state that I set up before processing, and I want to flatmap over that aggregator which returns a Mono. The aggregator will only emit a Mono with a value if the date changes so you only get one per day.

ScoreAggregator aggregator = ...

Flux<Score> scoreFlux = ...

scoreFlux.flatMap(aggregator::addScore)

So my question is... how do I emit a final element when the scoreFlux completes? The aggregator will have some data for the final day that hasn't been emitted yet and I need to get that sent.

Upvotes: 3

Views: 2437

Answers (1)

Michael Berry
Michael Berry

Reputation: 72379

Echoing the comment as an answer just so this doesn't show as unanswered:

So my question is... how do I emit a final element when the scoreFlux completes?

You can simply use concatWith() to concatenate the publisher you want after your original flux completes. If you only want that to be evaluated when the original publisher completes, make sure you wrap it up in Mono.defer(), which will prevent the pre-emptive execution.

Upvotes: 3

Related Questions