Aymen Kanzari
Aymen Kanzari

Reputation: 2013

FlatMap a Flux not executed

I have a list which contains 240 items, this list takes over 1 hour to be completely sent using for.

List<Map<String, Object>> conventions = mapConventions(objects, referentialService);

for(Map<String, Object> item : conventions) {
        webClient.post()
        .uri(configProperties.getUrl().getConvention() + CONVENTION)
        .bodyValue(objectMapper.convertValue(item, JsonNode.class))
        .retrieve()
        .bodyToMono(String.class);
}

So I followed this article to send it simultaneously to minimize the response time, but the code inside the flatmap is never executed:

Flux.fromIterable(conventions).flatMap(item ->  {
    System.out.print(item);
    return webClient.post()
            .uri(configProperties.getUrl().getConvention() + CONVENTION)
            .bodyValue(objectMapper.convertValue(item, JsonNode.class))
            .retrieve()
            .bodyToMono(String.class);
});

Upvotes: 3

Views: 6803

Answers (1)

jason814
jason814

Reputation: 61

In Reactive programming, there are Producers and Subscribers. While a Producer can output results, it doesn't do any good if nothing is listening to those results -- that is where the Subscriber comes into play. A Subscriber handles the output of the Producer and does something meaningful with the result. The is so fundamental to Reactive Programming that a Producer will not execute any code if a Subscriber isn't "listening" for the results.

So in this case the flatmap() is the Producer. And it will not execute any code unless there is a Subscriber to handle the output.

The short answer is to add a subscribe() call at the end of the flatmap. To Look something like this.

Flux.fromIterable(conventions).flatMap(item ->  {
    System.out.print(item);
    return webClient.post()
            .uri(configProperties.getUrl().getConvention() + CONVENTION)
            .bodyValue(objectMapper.convertValue(item, JsonNode.class))
            .retrieve()
            .bodyToMono(String.class);
}).subscribe();

Many tutorials have been written about this. for Example: https://spring.io/blog/2016/06/13/notes-on-reactive-programming-part-ii-writing-some-code https://medium.com/@olehdokuka/mastering-own-reactive-streams-implementation-part-1-publisher-e8eaf928a78c https://projectreactor.io/docs/core/release/reference/

Upvotes: 2

Related Questions