Reputation: 36984
I want to execute http request for each queue element. These requests shoule be called in parallel.
Also I need to await the termination of all requests.
I developed the following code:
List<Mono<MyResponseDTO>> monoList = queue.stream()
.map(jobStatusBunch -> webClient
.post()
.uri("localhost:8080/api/some/url")
.bodyValue(convertToRequestDto(someBean))
.retrieve()
.toEntity(String.class)
.filter(HttpEntity::hasBody)
.map(stringResponseEntity -> {
try {
return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
} catch (JsonProcessingException e) {
log.error("Can't parse", e);
return null;
}
})
.doOnNext(myResponseDTO -> {
log.info("doOnNext is invoked");
})
).collect(Collectors.toList());
//await when all MONOs are completed
log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
.flatMap(Function.identity())
.then();
log.info("Finished waiting for {}", monoList);
and I see following log when queue has single element:
2019-11-19 19:17:17.733 INFO 5896 --- [ scheduling-1] c.b.m.service.MyService : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988 INFO 5896 --- [ scheduling-1] c.b.m.service.MyService : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [ scheduling-1] o.s.w.r.f.client.ExchangeFunctions : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230 INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService : doOnNext is invoked
So this code doesn't allow to await request termination.
How could I achieve it ?
looks like Flux.merge(monoList).blockLast()
is smth I need. Will it work correct ?
Upvotes: 2
Views: 2468
Reputation: 1842
Use this to execute requests in parallel and wait until they complete:
List<Mono<MyResponseDTO>> monoList = queue
.stream()
.map(requestDTO ->
webClient
.post()
.uri("localhost:8080/api/some/url")
.bodyValue(requestDTO)
.retrieve()
.bodyToMono(MyResponseDTO.class))
.collect(Collectors.toList());
// This will execute all requests in parallel and wait until they complete,
// or throw an exception if any request fails.
List<MyResponseDTO> responses = Flux.merge(monoList).collectList().block();
You might want to set logging of reactor.netty.http.client
to DEBUG
to see no extra requests are made. That can happen for example if you accidentally use both mono#subscribe
and mono#block
.
If you want to separate the handling of responses and the waiting for requests to complete, then CompletableFutures can be used:
List<Mono<MyResponseDTO>> webClientMonos = getMonos();
// Start executing requests in parallel.
List<CompletableFuture<MyResponseDTO>> futures = webClientMonos.stream()
.map(mono -> mono.toFuture())
.collect(toList());
for (CompletableFuture<MyResponseDTO> future : futures) {
future.thenAccept(responseDTO -> {
// Do something with a response when it arrives at some point.
// ...
});
}
// ...
// Block until all requests have completed.
for (CompletableFuture<MyResponseDTO> future : futures) {
try {
// Maybe WebClient has been configured with timeouts,
// but it doesn't hurt to have a timeout here, too.
future.get(60, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
} catch (ExecutionException | TimeoutException ex) {
// ExecutionException is thrown if HTTP request fails.
throw new RuntimeException(ex);
}
}
Upvotes: 1
Reputation: 59116
You could try the following:
Flux<MyResponseDTO> responses = queue.stream()
.flatMap(jobStatusBunch -> webClient
.post()
.uri("localhost:8080/api/some/url")
.bodyValue(convertToRequestDto(someBean))
.retrieve()
.toEntity(MyResponseDTO.class));
Mono<Void> workDone = response.then();
This is simple and should do the job. By default (if I'm not mistaken), the subscriber will request for 256
elements which means that you'll get maximum 256 HTTP requests processed in parallel. This can depend on the connection pool configured on the HTTP client; by default, on Reactor Netty, the maximum number of TCP channels is higher than that.
Various Reactor operators, including flatMap
, offer variants with a concurrency
method parameter to control the maximum concurrency there.
Your solution with Flux.merge
with a list of Mono
would be equivalent. On the other hand, using Flux.concat
would not be what you're looking for since it would be subscribing to Mono
as elements are requested, so you might not the get maximum concurrency you want.
Upvotes: 0