Reputation: 413
I am new to Reactive programming and I would like to make two API calls in parallel and process the results and return a simple array or list of items.
I have two functions, one returns a Flux and the other returns a Mono and I make a very simple filtering logic on the Flux emitted items depending on the result of that Mono.
I tried to use zipWith
but only one item made it to the end no matter what filtering logic. Also I tried with block
but that is not allowed inside the controller :/
@GetMapping("/{id}/offers")
fun viewTaskOffers(
@PathVariable("id") id: String,
@AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> {
data class TaskOfferPair(
val task: TaskDTO,
val offer: ViewOfferDTO
)
return client.getTaskOffers(id).map {
it.toViewOfferDTO()
}.zipWith(client.getTask(id), BiFunction {
offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
}).filter {
it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
}.map {
it.offer
}
}
getTaskOffers
returns a Flux of OfferDTO
getTask
returns a Mono of TaskDTO
If you cannot answer my question please tell me atleast how to do multiple API calls in parallel and wait for the results in WebClient
Upvotes: 3
Views: 8278
Reputation: 1391
Convert your Mono to a Flux using repeat():
client.getTask(id).cache().repeat();
So your code would become
return client.getTaskOffers(id).map {
it.toViewOfferDTO()
}.zipWith(client.getTask(id).cache().repeat(), BiFunction {
offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
}).filter {
it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
}.map {
it.offer
}
Upvotes: 1
Reputation: 13090
Here is a use case for a parallel call.
public Mono<UserInfo> fetchCarrierUserInfo(User user) {
Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());
return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> {
UserInfo userInfo = tuple.getT1();
userInfo.setCarrier(tuple.getT2());
return userInfo;
});
}
Here:
fetchUserInfo
makes http call to get user info from another service and returns Mono
fetchCarrierInfo
method makes HTTP call to get carrierInfo from another service and returns Mono
Mono.zip()
merges given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple2. Then, call fetchCarrierUserInfo().block()
it to get the final result.
Upvotes: 4
Reputation: 17721
As you already figured out, zipWith
won't help you there, since it will produce min(a.size, b.size)
, which will always be 1, in case one of them is Mono
.
But since those two are independent, you can simply split them:
val task: Mono<TaskDTO> = client.getTask(id)
val result: Flux<ViewOfferDTO> =
task.flatMapMany {t ->
client.getTaskOffers(id).map {offer ->
t to offer
}
}.filter {
it.second.workerUser.id == user.id || it.first.creatorUser == user.id
}.map {
it.second
}
Note that if you want to have pair of elements, you can use built-in Pair
.
Also, this check doesn't make much sense, since you have only Mono
: it.first.creatorUser
Upvotes: 1