Reputation: 283
I have a LegacyAccountDto that I need to build a list of from two separate sources. One is a local JPA repository and the other is a web service call. The web service version has the accountStatus available where the JPA data source does not. I need to execute two calls in parallel as Fluxes, and then when they both complete, I need to find the legacyId of the web service list and populate the list with the accountStatus pulled from the web service. The whole idea is to return a list with the completed DTO. I don't need to save it back to the web service or the JPA repo
The DTO:
@Data
@JsonInclude(Include.NON_NULL)
public class LegacyAccountDto {
private UUID id;
private UUID organizationId;
private String name;
private String website;
private Long legacyAccountId;
private LocalDateTime legacyCreated;
private String accountType;
private String accountState;
}
Each function in the merge statement returns a Flux of LegacyDTO
Flux<LegacyAccountDto> completed = Flux.merge(
getLegacyAccountsFromSvc(accountNames),
Flux.fromIterable(accountMapper.accountListToLegacyAccountDtoList(accountRepository.getAccountsByNames(accountNames).get()))
)
.parallel()
.runOn(Schedulers.elastic())
.???????((list1, list2) -> {
list2.map(l2 -> {
//find list1 by legacyId
//set l2.accountStatus = l1.accountstatus
})
//return the completed list as a flux
})
I'm not sure what function to call next to be able to have access to both lists and grab the accountStatus out of the second call and be able to merge it and have it not be returning a parallel flux type rather than just the Flux of LegacyDto
Upvotes: 2
Views: 5037
Reputation: 4956
You could do it this way:
Mono<List<LegacyAccountDto>> firstMonoList = getLegacyAccountsFromSvc(accountNames).collectList();
Mono<List<EntityX> secondMonoList = accountMapper.accountListToLegacyAccountDtoList(accountRepository.getAccountsByNames(accountNames).get()).collectList();
Mono.zip(firstMonoList, secondMonoList)
.map(listTuple -> {
// do the searching and map to list of dtos
return resultingList;
})
.flatMapMany(Flux::fromIterable);
It's not advised to use blocking database calls in reactive flow. If you have the option you could add the R2DBC
driver and make the whole thing reactive.
Upvotes: 2