shredmill
shredmill

Reputation: 283

How to merge two execute two reactor fluxes in parallel that return lists and merge the results

I have a LegacyAccountDto that I need to build a list of from two separate sources. One is a local JPA repository and the other is a web service call. The web service version has the accountStatus available where the JPA data source does not. I need to execute two calls in parallel as Fluxes, and then when they both complete, I need to find the legacyId of the web service list and populate the list with the accountStatus pulled from the web service. The whole idea is to return a list with the completed DTO. I don't need to save it back to the web service or the JPA repo

The DTO:

@Data
@JsonInclude(Include.NON_NULL)
public class LegacyAccountDto {
  private UUID id;
  private UUID organizationId;
  private String name;
  private String website;
  private Long legacyAccountId;
  private LocalDateTime legacyCreated;
  private String accountType;
  private String accountState;
}

Each function in the merge statement returns a Flux of LegacyDTO

Flux<LegacyAccountDto> completed = Flux.merge(
      getLegacyAccountsFromSvc(accountNames),
      Flux.fromIterable(accountMapper.accountListToLegacyAccountDtoList(accountRepository.getAccountsByNames(accountNames).get()))
    )
    .parallel()
    .runOn(Schedulers.elastic())
    .???????((list1, list2) -> {
      list2.map(l2 -> {
        //find list1 by legacyId
        //set l2.accountStatus = l1.accountstatus
      })
      //return the completed list as a flux
    })

I'm not sure what function to call next to be able to have access to both lists and grab the accountStatus out of the second call and be able to merge it and have it not be returning a parallel flux type rather than just the Flux of LegacyDto

Upvotes: 2

Views: 5037

Answers (1)

NikolaB
NikolaB

Reputation: 4956

You could do it this way:

Mono<List<LegacyAccountDto>> firstMonoList = getLegacyAccountsFromSvc(accountNames).collectList();
Mono<List<EntityX> secondMonoList = accountMapper.accountListToLegacyAccountDtoList(accountRepository.getAccountsByNames(accountNames).get()).collectList();

Mono.zip(firstMonoList, secondMonoList)
.map(listTuple -> {
  // do the searching and map to list of dtos
  return resultingList;
})
.flatMapMany(Flux::fromIterable);

It's not advised to use blocking database calls in reactive flow. If you have the option you could add the R2DBC driver and make the whole thing reactive.

Upvotes: 2

Related Questions