Reputation: 123
I want to chain one mono after each flux event. The mono publisher will need information from each event published by the flux. The response should be a flux with data of the flux event and the mono response.
After digging, I end up with a map inside a flatMap. The code looks like this:
override fun searchPets(petSearch: PetSearch): Flux<Pet> {
return petRepository
.searchPets(petSearch) // returns Flux<pet>
.flatMap { pet ->
petService
.getCollarForMyPet() // returns Mono<collar>
.map { collar -> PetConverter.addCollarToPet(pet, collar) } //returns pet (now with with collar)
}
}
My main concerns are:
Upvotes: 0
Views: 972
Reputation: 28301
This approach is perfectly fine.
The Reactive Streams specification mandates that onNext
events don't overlap, so there won't be an issue with race conditions.
flatMap
introduces concurrency though, so multiple calls to the PetService
will run in parallel. This shouldn't be an issue, unless searchPets
emits some instance of Pet
twice.
Not that due to that concurrency, flatMap
can kind of reorder pets in this scenario. Imagine the search returns petA
then petB
, but the petService
call for petA
takes longer. In the ouptut of the flatMap, petB
would be emitted first (with its collar set), then petA
.
Upvotes: 1