hmble
hmble

Reputation: 140

WebFlux chaining to call multiple services and response aggregation

I have recently started using WebFlux and need suggestion on how to chain multiple services and aggregate responses on the way. The 4 services and their Response POJO are similar to following example:

class Response1{
   String a1;
   String a2;
}

class Response2{
   String b1;
}

class Response3{
   String c1;
}

class Response4{
   String d1;
}

and the signature of 4 services:

Flux<Response1> service1(); 
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)

So service2 needs to invoked for each Response1 in Flux, and service3 for each Response2. Relationship between models is:

Response1 <1-----*>Response2 (1 to many), 
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)

Aggregated final response should look like (JSON):

[
  {
    "a1": "",
    "a2": "",
    "d1": "",
    "response2s": [
      {
        "b1": "",
        "response3s": [
          {
            "c1": ""
          }
        ]
      }
    ]
  }
]

So first I need to call Service1 and then call service2 for each Response1, then call service3 for each Response2(returned by service2). Also, call service4 for each response1 returned by service1 (could be called in parallel to service2 and service3 calls). In order to update Aggregated final Response, I have added two additional POJOs to allow storing child responses, e.g. (relevant bits):

public class AggResponse extends Response1{
    List<AggResponse2> response2s;// populated from service2 response
    String d1; // populated from service4 response

    public void add(AggResponse2 res2){
        if(response2s == null)
            response2s = new ArrayList<>();
        response2s.add(res2);
    }
}

and

public class AggResponse2 extends Response2{
    List<Response3> response3s;// populated from service3 response

    public void add(Response3 res3) {
        if (response3s == null)
            response3s = new ArrayList<>();
        response3s.add(res3);
    }
}

How best to do chaining so that I retain previous response data and while combining operators retain all data in the AggResponse object? I tried following:

public Flux<AggResponse> aggregate() {
    return services.service1()
            .map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
            .flatMap(aggRes -> services.service2(aggRes.getA1())
                    .map(res2 -> {
                        AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
                        aggRes.add(aggRes2);
                        return aggRes2;
                    })
                    .flatMap(aggRes2 -> services.service3(aggRes2.getB1())
                            .map(res3 -> {
                                aggRes2.add(res3);
                                return res3;
                            })
                            .reduce(aggRes2, (a, aggRes3) -> aggRes2)
                    )
                    .reduce(aggRes, (a, aggRes2) -> aggRes)
            )
            .flatMap(aggRes -> services.service4(aggRes.getA1())
                    .map(res4 -> {
                        aggRes.setD1(res4.getD1());
                        return aggRes;
                    })
            );
}

however, I get following incomplete response:

[ {
  "a1" : "a1v1",
  "a2" : "a2v1"
} ]

I see all services being called out as I see logs. Two questions: 1. why don't see aggregated response, could reduce be loosing it? 2. is there a better approach of achieving this?

Upvotes: 4

Views: 4716

Answers (2)

AlbertOu
AlbertOu

Reputation: 69

Using Flux.combineLatest() looks a little simpler.

 service1().flatMap(response1 -> Flux.combineLatest(
      service2Service3(response1.a1).collectList(), // call service2 which call service3
      service4(response1.a2),                       // call service4
      (aggResponse2, response4)->{
                    //log.info(agg);
                    //log.info(response4);
                    FinalResponse agg = FinalResponse.builder()
                            .a1(response1.a1)
                            .a2(response1.a2)
                            .d1(response4.d1)
                            .response2s(aggResponse2)
                            .build();
                    return agg;
                })
            )

here is the full source code.

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.*;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;

@Log4j2
public class CombiningTests {
    @Data
    @AllArgsConstructor
    @ToString
    static class Response1{
        String a1;
        String a2;
    }

    @Data
    @AllArgsConstructor
    @ToString
    static class Response2{
        String b1;
        public Response2(){
        }
    }
    @Data
    @AllArgsConstructor
    @ToString
    static class Response3{
        String c1;
    }
    @Data
    @AllArgsConstructor
    @ToString
    static class Response4{
        String d1;
    }

    @Data
    @ToString(callSuper=true)
    static class AggResponse2 extends Response2{
        List<Response3> response3s;
        public AggResponse2(String b1, List<Response3> response3s) {
            super(b1);
            this.response3s = response3s;
        }
    }

    @Data
    @ToString
    @Builder
    static class FinalResponse {
        final String a1;
        final String a2;
        final String d1;
        final List<AggResponse2> response2s;
    }

    static Flux<Response1> service1(){
        //
        return Flux
                .just(new Response1("a1", "a2"))
                .delayElements(Duration.ofMillis(4));
    }
    static Flux<Response2> service2(String a1){
        //
        return Flux
                .just(new Response2("b1-" + a1), new Response2("b2-" + a1))
                .delayElements(Duration.ofMillis(3));
    }
    static Flux<Response3> service3(String b1){
        //
        return Flux
                .just(new Response3("c1-" + b1), new Response3("c2-" + b1))
                .delayElements(Duration.ofMillis(5));
    }
    static Mono<Response4> service4(String a2){
        //
        return Mono
                .just(new Response4("d1-" + a2))
                .delayElement(Duration.ofMillis(8));
    }

    static Flux<AggResponse2> service2Service3(String a1){
        return service2(a1)
                .flatMap(
                        x2->service3(x2.b1)
                                .collectList()
                                .map(x3->new AggResponse2(x2.b1, x3))
                );
    }
    /**
     * service1() 1 -----> * service2() 1 --> * service3()
     *                |--> 1 service4()
     */
    @Test
    void testComplexCombineLatest(){
        ObjectMapper objectMapper = new ObjectMapper();
        service1().flatMap(response1 -> Flux.combineLatest(
                service2Service3(response1.a1).collectList(), // call service2 which call service3
                service4(response1.a2),                       // call service4
                (aggResponse2, response4)->{
                    //log.info(agg);
                    //log.info(response4);
                    FinalResponse agg = FinalResponse.builder()
                            .a1(response1.a1)
                            .a2(response1.a2)
                            .d1(response4.d1)
                            .response2s(aggResponse2)
                            .build();
                    return agg;
                })
            ).doOnNext(e->{
                    try {
                        String json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(e);
                        System.out.println("JSON = " + json);
                    } catch (Exception ex) {
                        //e.printStackTrace();
                    }
            }).blockLast();
    }
}

and here is the execution result in json format:

JSON = {
  "a1" : "a1",
  "a2" : "a2",
  "d1" : "d1-a2",
  "response2s" : [ {
    "b1" : "b1-a1",
    "response3s" : [ {
      "c1" : "c1-b1-a1"
    }, {
      "c1" : "c2-b1-a1"
    } ]
  }, {
    "b1" : "b2-a1",
    "response3s" : [ {
      "c1" : "c1-b2-a1"
    }, {
      "c1" : "c2-b2-a1"
    } ]
  } ]
}

Upvotes: 2

Alexander Pankin
Alexander Pankin

Reputation: 3955

You could use merge method if you dont't want to wait service2's next signal for your service4. Something like this:

return service1().flatMap(response1 ->
        Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
                .reduce((aggResponse, aggResponse2) -> new AggResponse(
                        response1.a1,
                        response1.a2,
                        Optional.ofNullable(aggResponse.d1)
                                .orElse(aggResponse2.d1),
                        Optional.ofNullable(aggResponse.response2s)
                                .orElse(aggResponse2.response2s))));

Utility classes and methods:

class AggContainer {
    final String b1;
    final List<Response3> response3s;

    AggContainer(String b1, List<Response3> response3s) {
        this.b1 = b1;
        this.response3s = response3s;
    }
}

class AggResponse {
    final String a1;
    final String a2;
    final String d1;
    final List<AggContainer> response2s;

    AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
        this.a1 = a1;
        this.a2 = a2;
        this.d1 = d1;
        this.response2s = response2s;
    }

    AggResponse(String d1) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = d1;
        this.response2s = null;
    }

    AggResponse(List<AggContainer> response2s) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = null;
        this.response2s = response2s;
    }
}

private Mono<AggResponse> service23Agg(String a1) {
    return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
            .map(response3s -> new AggContainer(response2.b1, response3s)))
            .collectList()
            .map(AggResponse::new);
}

private Mono<AggResponse> service4Agg(String a2) {
    return service4(a2).map(response4 -> new AggResponse(response4.d1));
}

And you should be very careful with mutable collections in the asynchronous environment. Avoid to change it inside reactive pipeline.

Upvotes: 3

Related Questions