Sivasakthi Jayaraman
Sivasakthi Jayaraman

Reputation: 4724

Handle multiple external services using project reactor

I am new to reactive programming and trying to simulate the below use case using project reactor but I see little bit difficult to pass the response from one service call to another dependent service. Any suggestions or references will be highly appreciated.

Response getDetails(Request inputRequest){

    //Call two external services parallel based on the incoming request 
        Response1 = callExternalService1(inputRequest)
        Response2 = callExternalService2(inputRequest)


    //Call one external service based on the Response1 and Response2
        Response3 = callExternalService3(Response1,Response2);


    //Call three external service parallel based on the Response1, Response2 and Response3
        Response4 = callExternalService4(Response1,Response2,Response3);
        Response5 = callExternalService5(Response1,Response2,Response3);
        Response6 = callExternalService6(Response1,Response2,Response3);


    //Last service call
       Response finalResponse= callLastExternalService(Response4,Response5,Response6);
       return finalResponse;

}

I tried the below sample and it's working for one service call but not able to pass the response to other dependent service calls.

Updated answer:

Mono<Response> getDetails(Request inputRequest){
    
    return Mono.just(inputRequest.getId())
                    .flatMap(id->{
                        DbResponse res = getDBCall(id).block();
                        if(res == null){
                            return Mono.error(new DBException(....));
                        }
                        return Mono.zip(callExternalService1(res),callExternalService2(inputRequest));
                    }).flatMap(response->{
                        Response extser1 = response.getT1();
                        Response extser2 = response.getT2();
                        //any exceptions?
                        return Mono.zip(Mono.just(extser1),Mono.just(extser2),callExternalService3();
                    }).flatMap(response->callExternalService4(response.getT1(),response.getT2(),response.getT3())
                    });
}

private Mono<DbResponse> getDBCall(String id) {
        return Mono.fromCallable(()->dbservice.get(id))
                .subscribeOn(Schedulers.boundedElastic());
}

Questions:

  1. How to convert Mono<DbResponse> to DbResponse without using block operation?
  2. If any of the external service failed, how to build the failure response inside the flatmap and return back?

Upvotes: 1

Views: 1185

Answers (2)

Prashant Pandey
Prashant Pandey

Reputation: 4642

If you have n calls and you want to move in steplock (that is, move ahead iff you have the response from all the calls), use zip. For instance:

Mono.zip(call1, call2)
    .flatMap(tuple2 -> {
         ResponseEntity<?> r1 = tuple2.getT1();    //response from call1
         ResponseEntity<?> r2 = tuple2.getT2();    //response from call2
         return Mono.zip(Mono.just(r1), Mono.just(r2), call3);
     })
    .flatMap(tuple3 -> {
         //at this point, you have r1, r2, r3. tuple3.getT1() response from call 1
         return Mono.zip(call4, call5, call6);   //tuple3.getT2() response from call 2, tuple3.getT3() response from call3
     })
    .flatMap(tuple3 -> callLastService);

Note: If is more of a pseudo-code, won't compile right away

You can extend the above to answer your own question. Note that since call1 and call2 are independent, you can run them parallely using subscribeOn(Schedulers.boundedElastic())

Edit: Answering the two follow-up questions:

  1. No need to subscribe using block() as flatMap subscribes to your inner streams eagerly. You can do something like:

     Mono.just(inputRequest.getId())
         .flatMap(a -> getDBCall(a).switchIfEmpty(Mono.defer(() -> Mono.error(..))))
    

Note: Mono.callable(..) returns an empty stream if the callable returns empty. That's why switchIfEmpty

  1. You can use operators like onErrorResume to provide a fallback stream. See: The difference between onErrorResume and doOnError

Upvotes: 3

Zrom
Zrom

Reputation: 1230

if your services return Mono of Response (otherwise you have to transform them), you can make parllel calls using zip :

    Mono.zip( callExternalService1( inputRequest ),
              callExternalService2( inputRequest ) )
        .flatMap( resp1AndResp2 -> this.callExternalService3( resp1AndResp2.getT1(),
                                                              resp1AndResp2.getT2() )
                                       .flatMap( response3 -> Mono.zip( callExternalService4( resp1AndResp2.getT1(),
                                                                                              resp1AndResp2.getT2(),
                                                                                              response3 ),
                                                                        callExternalService5( resp1AndResp2.getT1(),
                                                                                              resp1AndResp2.getT2(),
                                                                                              response3 ),
                                                                        callExternalService6( resp1AndResp2.getT1(),
                                                                                              resp1AndResp2.getT2(),
                                                                                              response3 ) )
                                                                  .flatMap( resp4AndResp5AndResp6 -> callLastExternalService( resp4AndResp5AndResp6.getT1(),
                                                                                                                              resp4AndResp5AndResp6.getT2(),
                                                                                                                              resp4AndResp5AndResp6.getT3() ) ) ) );

Upvotes: 2

Related Questions