Reputation: 140
I have recently started using WebFlux and need suggestion on how to chain multiple services and aggregate responses on the way. The 4 services and their Response POJO are similar to following example:
class Response1{
String a1;
String a2;
}
class Response2{
String b1;
}
class Response3{
String c1;
}
class Response4{
String d1;
}
and the signature of 4 services:
Flux<Response1> service1();
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)
So service2 needs to invoked for each Response1 in Flux, and service3 for each Response2. Relationship between models is:
Response1 <1-----*>Response2 (1 to many),
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)
Aggregated final response should look like (JSON):
[
{
"a1": "",
"a2": "",
"d1": "",
"response2s": [
{
"b1": "",
"response3s": [
{
"c1": ""
}
]
}
]
}
]
So first I need to call Service1 and then call service2 for each Response1, then call service3 for each Response2(returned by service2). Also, call service4 for each response1 returned by service1 (could be called in parallel to service2 and service3 calls). In order to update Aggregated final Response, I have added two additional POJOs to allow storing child responses, e.g. (relevant bits):
public class AggResponse extends Response1{
List<AggResponse2> response2s;// populated from service2 response
String d1; // populated from service4 response
public void add(AggResponse2 res2){
if(response2s == null)
response2s = new ArrayList<>();
response2s.add(res2);
}
}
and
public class AggResponse2 extends Response2{
List<Response3> response3s;// populated from service3 response
public void add(Response3 res3) {
if (response3s == null)
response3s = new ArrayList<>();
response3s.add(res3);
}
}
How best to do chaining so that I retain previous response data and while combining operators retain all data in the AggResponse object? I tried following:
public Flux<AggResponse> aggregate() {
return services.service1()
.map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
.flatMap(aggRes -> services.service2(aggRes.getA1())
.map(res2 -> {
AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
aggRes.add(aggRes2);
return aggRes2;
})
.flatMap(aggRes2 -> services.service3(aggRes2.getB1())
.map(res3 -> {
aggRes2.add(res3);
return res3;
})
.reduce(aggRes2, (a, aggRes3) -> aggRes2)
)
.reduce(aggRes, (a, aggRes2) -> aggRes)
)
.flatMap(aggRes -> services.service4(aggRes.getA1())
.map(res4 -> {
aggRes.setD1(res4.getD1());
return aggRes;
})
);
}
however, I get following incomplete response:
[ {
"a1" : "a1v1",
"a2" : "a2v1"
} ]
I see all services being called out as I see logs. Two questions: 1. why don't see aggregated response, could reduce be loosing it? 2. is there a better approach of achieving this?
Upvotes: 4
Views: 4716
Reputation: 69
Using Flux.combineLatest()
looks a little simpler.
service1().flatMap(response1 -> Flux.combineLatest(
service2Service3(response1.a1).collectList(), // call service2 which call service3
service4(response1.a2), // call service4
(aggResponse2, response4)->{
//log.info(agg);
//log.info(response4);
FinalResponse agg = FinalResponse.builder()
.a1(response1.a1)
.a2(response1.a2)
.d1(response4.d1)
.response2s(aggResponse2)
.build();
return agg;
})
)
here is the full source code.
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.*;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
@Log4j2
public class CombiningTests {
@Data
@AllArgsConstructor
@ToString
static class Response1{
String a1;
String a2;
}
@Data
@AllArgsConstructor
@ToString
static class Response2{
String b1;
public Response2(){
}
}
@Data
@AllArgsConstructor
@ToString
static class Response3{
String c1;
}
@Data
@AllArgsConstructor
@ToString
static class Response4{
String d1;
}
@Data
@ToString(callSuper=true)
static class AggResponse2 extends Response2{
List<Response3> response3s;
public AggResponse2(String b1, List<Response3> response3s) {
super(b1);
this.response3s = response3s;
}
}
@Data
@ToString
@Builder
static class FinalResponse {
final String a1;
final String a2;
final String d1;
final List<AggResponse2> response2s;
}
static Flux<Response1> service1(){
//
return Flux
.just(new Response1("a1", "a2"))
.delayElements(Duration.ofMillis(4));
}
static Flux<Response2> service2(String a1){
//
return Flux
.just(new Response2("b1-" + a1), new Response2("b2-" + a1))
.delayElements(Duration.ofMillis(3));
}
static Flux<Response3> service3(String b1){
//
return Flux
.just(new Response3("c1-" + b1), new Response3("c2-" + b1))
.delayElements(Duration.ofMillis(5));
}
static Mono<Response4> service4(String a2){
//
return Mono
.just(new Response4("d1-" + a2))
.delayElement(Duration.ofMillis(8));
}
static Flux<AggResponse2> service2Service3(String a1){
return service2(a1)
.flatMap(
x2->service3(x2.b1)
.collectList()
.map(x3->new AggResponse2(x2.b1, x3))
);
}
/**
* service1() 1 -----> * service2() 1 --> * service3()
* |--> 1 service4()
*/
@Test
void testComplexCombineLatest(){
ObjectMapper objectMapper = new ObjectMapper();
service1().flatMap(response1 -> Flux.combineLatest(
service2Service3(response1.a1).collectList(), // call service2 which call service3
service4(response1.a2), // call service4
(aggResponse2, response4)->{
//log.info(agg);
//log.info(response4);
FinalResponse agg = FinalResponse.builder()
.a1(response1.a1)
.a2(response1.a2)
.d1(response4.d1)
.response2s(aggResponse2)
.build();
return agg;
})
).doOnNext(e->{
try {
String json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(e);
System.out.println("JSON = " + json);
} catch (Exception ex) {
//e.printStackTrace();
}
}).blockLast();
}
}
and here is the execution result in json format:
JSON = {
"a1" : "a1",
"a2" : "a2",
"d1" : "d1-a2",
"response2s" : [ {
"b1" : "b1-a1",
"response3s" : [ {
"c1" : "c1-b1-a1"
}, {
"c1" : "c2-b1-a1"
} ]
}, {
"b1" : "b2-a1",
"response3s" : [ {
"c1" : "c1-b2-a1"
}, {
"c1" : "c2-b2-a1"
} ]
} ]
}
Upvotes: 2
Reputation: 3955
You could use merge
method if you dont't want to wait service2
's next
signal for your service4
. Something like this:
return service1().flatMap(response1 ->
Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
.reduce((aggResponse, aggResponse2) -> new AggResponse(
response1.a1,
response1.a2,
Optional.ofNullable(aggResponse.d1)
.orElse(aggResponse2.d1),
Optional.ofNullable(aggResponse.response2s)
.orElse(aggResponse2.response2s))));
Utility classes and methods:
class AggContainer {
final String b1;
final List<Response3> response3s;
AggContainer(String b1, List<Response3> response3s) {
this.b1 = b1;
this.response3s = response3s;
}
}
class AggResponse {
final String a1;
final String a2;
final String d1;
final List<AggContainer> response2s;
AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
this.a1 = a1;
this.a2 = a2;
this.d1 = d1;
this.response2s = response2s;
}
AggResponse(String d1) {
this.a1 = null;
this.a2 = null;
this.d1 = d1;
this.response2s = null;
}
AggResponse(List<AggContainer> response2s) {
this.a1 = null;
this.a2 = null;
this.d1 = null;
this.response2s = response2s;
}
}
private Mono<AggResponse> service23Agg(String a1) {
return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
.map(response3s -> new AggContainer(response2.b1, response3s)))
.collectList()
.map(AggResponse::new);
}
private Mono<AggResponse> service4Agg(String a2) {
return service4(a2).map(response4 -> new AggResponse(response4.d1));
}
And you should be very careful with mutable collections in the asynchronous environment. Avoid to change it inside reactive pipeline.
Upvotes: 3