Reputation: 105
I am trying to call external service in a micro-service application to get all responses in parallel and combine them before starting the other computation. I know i can use block() call on each Mono object but that will defeat the purpose of using reactive api. is it possible to fire up all requests in parallel and combine them at one point.
Sample code is as below. In this case "Done" prints before actual response comes up. I also know that subscribe call is non blocking.
I want "Done" to be printed after all responses has been collected, so need some kind of blocking. however do not want to block each and every request
final List<Mono<String>> responseOne = new ArrayList<>();
IntStream.range(0, 10).forEach(i -> {
Mono<String> responseMono =
WebClient.create("https://jsonplaceholder.typicode.com/posts")
.post()
.retrieve()
.bodyToMono(String.class)
;
System.out.println("create mono response lazy initialization");
responseOne.add(responseMono);
});
Flux.merge(responseOne).collectList().subscribe( res -> {
System.out.println(res);
});
System.out.println("Done");
Based on the suggestion, I came up with this which seems to work for me.
StopWatch watch = new StopWatch();
watch.start();
final List<Mono<String>> responseOne = new ArrayList<>();
IntStream.range(0, 10).forEach(i -> {
Mono<String> responseMono =
WebClient.create("https://jsonplaceholder.typicode.com/posts")
.post()
.retrieve()
.bodyToMono(String.class);
System.out.println("create mono response lazy initialization");
responseOne.add(responseMono);
});
CompletableFuture<List<String>> futureCount = new CompletableFuture<>();
List<String> res = new ArrayList<>();
Mono.zip(responseOne, Arrays::asList)
.flatMapIterable(objects -> objects) // make flux of objects
.doOnComplete(() -> {
futureCount.complete(res);
}) // will be printed on completion of the flux created above
.subscribe(responseString -> {
res.add((String) responseString);
}
);
watch.stop();
List<String> response = futureCount.get();
System.out.println(response);
// do rest of the computation
System.out.println(watch.getLastTaskTimeMillis());
Upvotes: 5
Views: 12227
Reputation: 72399
is it possible to fire up all requests in parallel and combine them at one point.
That's exactly what your code is doing already. If you don't believe me, stick .delayElement(Duration.ofSeconds(2))
after your bodyToMono()
call. You'll see that your list prints out after just over 2 seconds, rather than 20 (which is what it would be if executing sequentially 10 times.)
The combining part is happening in your Flux.merge().collectList()
call.
In this case "Done" prints before actual response comes up.
That's to be expected, as your last System.out.println()
call is executing outside of the reactive callback chain. If you want "Done" to print after your list is printed (which you've confusingly given the variable name s
in the consumer passed to your subscribe()
call) then you'll need to put it inside that consumer, not outside it.
If you're interfacing with an imperative API, and you therefore need to block on the list, you can just do:
List<String> list = Flux.merge(responseOne).collectList().block();
...which will still execute the calls in parallel (so still gain you some advantage), but then block until all of them are complete and combined into a list. (If you're just using reactor for this type of usage however, it's debatable if it's worthwhile.)
Upvotes: 3
Reputation: 1675
Mono.zip
Done
to be printed after the collection of all the responsesSo, you can modify your code as below
final List<Mono<String>> responseMonos = IntStream.range(0, 10).mapToObj(
index -> WebClient.create("https://jsonplaceholder.typicode.com/posts").post().retrieve()
.bodyToMono(String.class)).collect(Collectors.toList()); // create iterable of mono of network calls
Mono.zip(responseMonos, Arrays::asList) // make parallel network calls and collect it to a list
.flatMapIterable(objects -> objects) // make flux of objects
.doOnComplete(() -> System.out.println("Done")) // will be printed on completion of the flux created above
.subscribe(responseString -> System.out.println("responseString = " + responseString)); // subscribe and start emitting values from flux
It's also not a good idea to call subscribe
or block
explicitly in your reactive code.
Upvotes: 4