Nick
Nick

Reputation: 147

How to wait until List<Mono<List<Object>>> finishes?

It's my first time working with webClient and I am wondering how to wait until List<Mono<>> finishes. I have the following code:

List<Address> addresses = collectAllAddresses(someObject);
List<Mono<List<AnotherAddress>>> monoResponses =
        addresses.stream()
                .map(address -> webClientGateway.findAddresses(userData, address.getFullAddress()))
                .collect(Collectors.toList());

Mono.when(monoResponses).block();

log.info("mono responses");
monoResponses.stream()
        .flatMap(it -> Objects.requireNonNull(it.block()).stream()).forEach(it -> log.info("mono responses + {}", it));

and the following findAddresses method:

public Mono<List<AnotherAddress>> findAddresses(UserData userData, String fullAddress) {

    if (StringUtils.isEmpty(fullAddress)) {
        log.info("Address is empty that why we return Mono.just(Collections.emptyList()");
        return Mono.just(Collections.emptyList());
    }

    return webClient.get()
            .uri(path, uri -> uri.queryParam("query", fullAddress).queryParam("count", 1).build())
            .header("someHeader", someHeader)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<List<AnotherAddress>>() {
            })
            .doOnError(e -> log.error("Error occurred!", e));
}

but every time I execute it I always get list of empty objects, I mean I get List but every object in that list is empty (every field of class AnotherAddress is null). What can be wrong?

UDP: more explanations: I have two microservices. In one microservice (that return another address) there is RestController that sends anotherAddress. In another microservice I want to use WebClient (instead of using threadPool with many threads) to call the RestController from previous microservice. When I have previous implementation for function webClientGateway.findAddresses(userData, address.getFullAddress()) and it returns Mono<List> I tested it and immediately after calling function I call block on result and it works. But now I have following situation, I have many addresses (maybe 5 or 10) and I want send async request for every address and wait until latest finishes and after that I want to do another operation, but instead of getting fullfielded AnotherAddress instance, I am getting 5 empty AnotherAddress instances (every field is null)

Upvotes: 2

Views: 856

Answers (1)

Puce
Puce

Reputation: 38132

Use a Flux instead of a Mono, e.g. something like (untested):

public Flux<AnotherAddress> findAddresses(UserData userData, String fullAddress) {

    if (StringUtils.isEmpty(fullAddress)) {
        log.info("Address is empty that why we return Mono.just(Collections.emptyList()");
        return Flux.empty();
    }

    return webClient.get()
            .uri(path, uri -> uri.queryParam("query", fullAddress).queryParam("count", 1).build())
            .header("someHeader", someHeader)
            .retrieve()
            .bodyToFlux(AnotherAddress.class)
            .doOnError(e -> log.error("Error occurred!", e));
}

If you don't need the AnotherAddress list grouped by address the you could use something like (untested):

Flux<AnotherAddress> anotherAddressFlux= Flux.fromIterable(addresses)
                .flatMap(address -> webClientGateway.findAddresses(userData, address.getFullAddress()));

If you want to block you can use:

List<AnotherAddress> anotherAddressList = anotherAddressFlux.collectList().block();

Upvotes: 1

Related Questions