Suren Konathala
Suren Konathala

Reputation: 3597

How can we iterate and print the values from Reactor Flux or Mono FlatMap or FlatMapMany?

Learning Reactor with Spring Boot.

Using a sample API:

https://jsonplaceholder.typicode.com/todos/1
{
  "userId": 1,
  "id": 1,
  "title": "delectus aut autem",
  "completed": false
}

Wanted to map the above to an object (defined a pojo SingleUser) and print the output.

private WebClient webClient = WebClient.create("https://jsonplaceholder.typicode.com");

private Mono<ClientResponse> responseMono = webClient.get()
          .uri("/todos/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange();

public String getResult() {
   return ">> result = " + responseMono.flatMap(res -> res.bodyToMono(String.class)).block();
}

When using the above.. the result is:

>> result = {
  "userId": 1,
  "id": 1,
  "title": "delectus aut autem",
  "completed": false
}

How can i iterate and print all values when using Flux as below?

public Flux<SingleUser> listUsers1() {
    return webClient.get()
             .uri("/todos/1")
             .retrieve()
             .bodyToFlux(SingleUser.class);
}

public String getUsers1() {
   return ">> Get Users 1= " + listUsers1();
}

public Flux<SingleUser> listUsers2() {
   return webClient.get()
             .uri("/todos/1")
             .exchange()
             .flatMapMany(clientResponse -> clientResponse.bodyToFlux(SingleUser.class));
}

public String getUsers2() {
   return ">> Get Users 2= " + listUsers2();
}

Result when using both exchange() and retrieve():

>> Get Users 1= MonoFlatMapMany
>> Get Users 2= MonoFlatMapMany

How can we iterate through the object and print the values?

Upvotes: 2

Views: 16415

Answers (1)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

First please note that you are dealing with an asynchronous non-blocking model, but you are basically resorting to blocking for demonstration purposes.

The limit of that switch from async to sync shows when dealing with multiple values, because while each of these Flux can be printed out, you won't be in control of the "iteration" (which can happen in an interleaved manner between the 2 requests):

There is a family of Reactor operators called "side effects" for when you want to peek at the values in a sequence to eg. log/print them: doOn*. You can use doOnNext to print each value emitted by a Flux:

listUsers1().doOnNext(u -> System.out.println("listUsers1 received " + u);
listUsers2().doOnNext(u -> System.out.println("listUsers2 received " + u);

Only trouble is that this doesn't subscribe to the respective printing Flux, so nothing will happen. Even if you just .subscribe() in your test, the app could exit immediately without waiting for the end of these two async sequences.

So like in your first example, you need to block. With Flux, you can use .blockLast() to block until the flux completes.

Second trouble: your getUsersX() methods expect to be able to retrieve a String and return it, synchronously. This is not going to be practical with a Flux even with blockLast from above:

public String getUsers1() {
       return ">> Get Users 1= " + listUsers1().doOnNext(System.out::println).blockLast();
    }

public String getUsers2() {
   return ">> Get Users 2= " + listUsers2().doOnNext(System.out::println).blockLast();
}

System.out.println(getUsers1());
System.out.println(getUsers2());

Would log something like:

user1A
user1B
user1C
user1D
>> Get Users 1= user1D
user2A
user2B
user2C
>> Get Users 2= user2C

Notice how each request prints all values THEN the Get Users message with the last value repeated. Also, due to the blockLast() the first request must run its course before the second request is triggered.

The most reactive way of printing the request, with support for parallel requests, is to asynchronously collect the users and print the list once it becomes available:

listUsers1().collectList().doOnNext(l1 -> System.out.println(">> Get Users 1=" + l1).subscribe();
listUsers2().collectList().doOnNext(l2 -> System.out.println(">> Get Users 2=" + l2).subscribe();

But that suffers from the same caveat of not blocking in a main/test: the app could terminate before the lists are emitted so nothing gets printed. We can tie the two collectList into a Mono and wait for both completions using Mono.when:

Mono.when(
    listUsers1().collectList().doOnNext(l1 -> System.out.println(">> Get Users 1=" + l1),
    listUsers2().collectList().doOnNext(l2 -> System.out.println(">> Get Users 2=" + l2)
)
.block();

With that code, both requests are triggered at the same time and the fastest to complete is printed first, whichever that is.

Upvotes: 9

Related Questions