gstackoverflow
gstackoverflow

Reputation: 36984

How to await when all http requests are finished using spring webClient?

I want to execute http request for each queue element. These requests shoule be called in parallel.
Also I need to await the termination of all requests.

I developed the following code:

 List<Mono<MyResponseDTO>> monoList = queue.stream()
                .map(jobStatusBunch -> webClient
                        .post()
                        .uri("localhost:8080/api/some/url")
                        .bodyValue(convertToRequestDto(someBean))
                        .retrieve()
                        .toEntity(String.class)
                        .filter(HttpEntity::hasBody)
                        .map(stringResponseEntity -> {
                            try {
                                return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
                            } catch (JsonProcessingException e) {
                                log.error("Can't parse", e);
                                return null;
                            }
                        })
                        .doOnNext(myResponseDTO -> {
                            log.info("doOnNext is invoked");
                        })
                ).collect(Collectors.toList());
          //await when all MONOs are completed

log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
        .flatMap(Function.identity())
        .then();
log.info("Finished waiting for {}", monoList);

and I see following log when queue has single element:

2019-11-19 19:17:17.733  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [   scheduling-1] o.s.w.r.f.client.ExchangeFunctions       : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230  INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService     : doOnNext is invoked

So this code doesn't allow to await request termination.

How could I achieve it ?

P.S.

looks like Flux.merge(monoList).blockLast() is smth I need. Will it work correct ?

Upvotes: 2

Views: 2468

Answers (2)

Milanka
Milanka

Reputation: 1842

Simple case

Use this to execute requests in parallel and wait until they complete:

List<Mono<MyResponseDTO>> monoList = queue
        .stream()
        .map(requestDTO ->
                webClient
                    .post()
                    .uri("localhost:8080/api/some/url")
                    .bodyValue(requestDTO)
                    .retrieve()
                    .bodyToMono(MyResponseDTO.class))
        .collect(Collectors.toList());

// This will execute all requests in parallel and wait until they complete,
// or throw an exception if any request fails.
List<MyResponseDTO> responses = Flux.merge(monoList).collectList().block();

Verification

You might want to set logging of reactor.netty.http.client to DEBUG to see no extra requests are made. That can happen for example if you accidentally use both mono#subscribe and mono#block.

More complex case with CompletableFuture

If you want to separate the handling of responses and the waiting for requests to complete, then CompletableFutures can be used:

List<Mono<MyResponseDTO>> webClientMonos = getMonos();

// Start executing requests in parallel.
List<CompletableFuture<MyResponseDTO>> futures = webClientMonos.stream()
        .map(mono -> mono.toFuture())
        .collect(toList());
for (CompletableFuture<MyResponseDTO> future : futures) {
    future.thenAccept(responseDTO -> {
        // Do something with a response when it arrives at some point.
        // ...
    });
}

// ...

// Block until all requests have completed.
for (CompletableFuture<MyResponseDTO> future : futures) {
    try {
        // Maybe WebClient has been configured with timeouts,
        // but it doesn't hurt to have a timeout here, too.
        future.get(60, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(ex);
    } catch (ExecutionException | TimeoutException ex) {
        // ExecutionException is thrown if HTTP request fails.
        throw new RuntimeException(ex);
    }
}

Upvotes: 1

Brian Clozel
Brian Clozel

Reputation: 59116

You could try the following:

Flux<MyResponseDTO> responses = queue.stream()
     .flatMap(jobStatusBunch -> webClient
                    .post()
                    .uri("localhost:8080/api/some/url")
                    .bodyValue(convertToRequestDto(someBean))
                    .retrieve()
                    .toEntity(MyResponseDTO.class));

 Mono<Void> workDone = response.then();

This is simple and should do the job. By default (if I'm not mistaken), the subscriber will request for 256 elements which means that you'll get maximum 256 HTTP requests processed in parallel. This can depend on the connection pool configured on the HTTP client; by default, on Reactor Netty, the maximum number of TCP channels is higher than that.

Various Reactor operators, including flatMap, offer variants with a concurrency method parameter to control the maximum concurrency there.

Your solution with Flux.merge with a list of Mono would be equivalent. On the other hand, using Flux.concat would not be what you're looking for since it would be subscribing to Mono as elements are requested, so you might not the get maximum concurrency you want.

Upvotes: 0

Related Questions