Reputation: 4724
I am new to reactive programming and trying to simulate the below use case using project reactor but I see little bit difficult to pass the response from one service call to another dependent service. Any suggestions or references will be highly appreciated.
Response getDetails(Request inputRequest){
//Call two external services parallel based on the incoming request
Response1 = callExternalService1(inputRequest)
Response2 = callExternalService2(inputRequest)
//Call one external service based on the Response1 and Response2
Response3 = callExternalService3(Response1,Response2);
//Call three external service parallel based on the Response1, Response2 and Response3
Response4 = callExternalService4(Response1,Response2,Response3);
Response5 = callExternalService5(Response1,Response2,Response3);
Response6 = callExternalService6(Response1,Response2,Response3);
//Last service call
Response finalResponse= callLastExternalService(Response4,Response5,Response6);
return finalResponse;
}
I tried the below sample and it's working for one service call but not able to pass the response to other dependent service calls.
Updated answer:
Mono<Response> getDetails(Request inputRequest){
return Mono.just(inputRequest.getId())
.flatMap(id->{
DbResponse res = getDBCall(id).block();
if(res == null){
return Mono.error(new DBException(....));
}
return Mono.zip(callExternalService1(res),callExternalService2(inputRequest));
}).flatMap(response->{
Response extser1 = response.getT1();
Response extser2 = response.getT2();
//any exceptions?
return Mono.zip(Mono.just(extser1),Mono.just(extser2),callExternalService3();
}).flatMap(response->callExternalService4(response.getT1(),response.getT2(),response.getT3())
});
}
private Mono<DbResponse> getDBCall(String id) {
return Mono.fromCallable(()->dbservice.get(id))
.subscribeOn(Schedulers.boundedElastic());
}
Questions:
Upvotes: 1
Views: 1185
Reputation: 4642
If you have n calls and you want to move in steplock (that is, move ahead iff you have the response from all the calls), use zip
. For instance:
Mono.zip(call1, call2)
.flatMap(tuple2 -> {
ResponseEntity<?> r1 = tuple2.getT1(); //response from call1
ResponseEntity<?> r2 = tuple2.getT2(); //response from call2
return Mono.zip(Mono.just(r1), Mono.just(r2), call3);
})
.flatMap(tuple3 -> {
//at this point, you have r1, r2, r3. tuple3.getT1() response from call 1
return Mono.zip(call4, call5, call6); //tuple3.getT2() response from call 2, tuple3.getT3() response from call3
})
.flatMap(tuple3 -> callLastService);
Note: If is more of a pseudo-code, won't compile right away
You can extend the above to answer your own question. Note that since call1
and call2
are independent, you can run them parallely using subscribeOn(Schedulers.boundedElastic())
Edit: Answering the two follow-up questions:
No need to subscribe using block()
as flatMap
subscribes to your inner streams eagerly. You can do something like:
Mono.just(inputRequest.getId())
.flatMap(a -> getDBCall(a).switchIfEmpty(Mono.defer(() -> Mono.error(..))))
Note: Mono.callable(..)
returns an empty stream if the callable returns empty. That's why switchIfEmpty
onErrorResume
to provide a fallback stream. See: The difference between onErrorResume and doOnErrorUpvotes: 3
Reputation: 1230
if your services return Mono of Response (otherwise you have to transform them), you can make parllel calls using zip :
Mono.zip( callExternalService1( inputRequest ),
callExternalService2( inputRequest ) )
.flatMap( resp1AndResp2 -> this.callExternalService3( resp1AndResp2.getT1(),
resp1AndResp2.getT2() )
.flatMap( response3 -> Mono.zip( callExternalService4( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ),
callExternalService5( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ),
callExternalService6( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ) )
.flatMap( resp4AndResp5AndResp6 -> callLastExternalService( resp4AndResp5AndResp6.getT1(),
resp4AndResp5AndResp6.getT2(),
resp4AndResp5AndResp6.getT3() ) ) ) );
Upvotes: 2