Manikdn84
Manikdn84

Reputation: 425

Synchronous spring webflux call retaining the order of operation

I have a simple use case and not sure how to do, as I'm new to spring webflux.

I'm using spring boot webflux starters. I need to call 2 endpoints. Lets say Endpoint1 and Endpoint2.

Whenever Endpoint1 is hit with a request, I should hit the Endpoint2 first with the same request and use the response from the Endpoint2 to enrich the original request and then do something further. Endpoint1's request object needs to be enriched using the response from the Endpoint2 before doing anything. How do I enforce this order using Spring webflux? In my case, the original request object is not enriched before it could be used further. Any help on this greatly appreciated!!!

FYI - call to Endpoint2 is done using webclient

Just a pseudo code:

public Mono<Response1> endpoint1(Request1 request1){

  Flux<Response2> reponse2 = webclient.getEndpoint2(request1); // Returns a Flux

  //use the above reponse2 to enrich the request1

  return webclient.getSomething(request1); //Returns Mono<Response1>

}

Actual code:


 public Mono<ApplicationResponse> save(ApplicationRequest request) {

        return Mono.subscriberContext().flatMap(ctx -> {

            Mono blockingWrapper =  Mono.fromCallable(() ->
                    service.getId(request)
                            .subscriberContext(ctx)
                            .subscribe(id -> request.setId(id))
            ).subscribeOn(Schedulers.elastic());

            return blockingWrapper.flatMap(o -> authService.getAccessToken()
                    .flatMap(token -> post("/save", request,
                            token.getAccessToken(),
                            ctx)
                            .bodyToMono(ApplicationResponse.class))
                    .log());
        });
    }

Upvotes: 1

Views: 5560

Answers (3)

HTN
HTN

Reputation: 3604

Your problem comes from the second .subscriberContext(). It is a static method which creates a new Mono, that means the code before it never executes, that why request object does not change.

Anyway, your code is messy. Make it simpler. As far as I read your code, you do not need Flux at all. feesService.calculateApplicationFees(...) should return Mono<List<FeeItem>>. There are too many unecessary .log() or Mono.subscriberContext(). Do you even need the context here?

Upvotes: 0

Manikdn84
Manikdn84

Reputation: 425

I see something interesting happening. It works as expected if I orchestrate this from the Controller class, whereas if I call a service from my Controller class which orchestrates this flow, it doesn't seem to work as expected. Just wondering what am I missing? or is this how it is meant to be working?

This is the working code:

@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {

    private final ApplicationService applicationService;
    private final ApplicationRequestMapper requestMapper;
    private final FeesService feesService;

    @PostMapping(value = "/save")
    public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {

        ApplicationRequest applicationRequest = requestMapper.apply(request);

        return Mono.subscriberContext()
                .flatMap(context -> feesService.calculateApplicationFees(applicationRequest)
                        .collectList())
                .map(feeItems -> applicationRequest.getFeeItems().addAll(feeItems))
                .flatMap(isRequestEnriched -> applicationService.saveApplication(applicationRequest)
                        .map(saveApplicationResponse -> {
                            Application application = new Application();
                            application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
                            return application;
                        }))
                .onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
                        request.getLicenceId()),
                        throwable, true, false))
                .log();
    }
}


@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{

     private final AuthenticationService authenticationService;  

         public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {

            return Mono.subscriberContext()
                .flatMap(context -> authenticationService.getAccessToken()
                        .flatMap(token -> post("/save",
                                request,
                                token.getAccessToken(),
                                context)
                                .bodyToMono(SaveApplicationResponse.class))
                        .log());
    }
}



@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{

     private final AuthenticationService authenticationService;  

        public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {

        return Mono.subscriberContext()
                .flatMap(ctx -> authenticationService.getAccessToken()
                        .flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
                                .bodyToMono(FeeResponse.class))
                        .log())
                .flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
    }
}

Doesn't work if I do this.. Meaning, the request is never enriched at all:



@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {

    private final ApplicationService applicationService;
    private final ApplicationRequestMapper requestMapper;

     @PostMapping(value = "/save")
        public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {
            return Mono.subscriberContext()
                    .flatMap(context -> applicationService.saveApplication(requestMapper.apply(request))
                            .map(saveApplicationResponse -> {
                                Application application = new Application();
                                application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
                                return application;
                            }))
                    .onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
                            request.getLicenceId()),
                            throwable, true, false))
                    .log();
        }

}

@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{

     private final AuthenticationService authenticationService;
     private final FeesService feesService;


         public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {

            return Mono.subscriberContext()
                    .flatMap(context -> feesService.calculateApplicationFees(request)
                            .collectList())
                    .map(feeItems -> request.getFeeItems().addAll(feeItems))
                    .subscriberContext()
                    .flatMap(context -> authenticationService.getAccessToken()
                            .flatMap(token -> post("/save",
                                    request,
                                    token.getAccessToken(),
                                    context)
                                    .bodyToMono(SaveApplicationResponse.class))
                            .log());
        }
}



@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{

     private final AuthenticationService authenticationService;  

        public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {

        return Mono.subscriberContext()
                .flatMap(ctx -> authenticationService.getAccessToken()
                        .flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
                                .bodyToMono(FeeResponse.class))
                        .log())
                .flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
    }
}

Upvotes: 0

HTN
HTN

Reputation: 3604

If you are sure that you will have a Flux with getEndpoint2(request1), in this case, you can use collectList():

return webclient.getEndpoint2(request1) // Flux<Response2>
         .collectList() // Mono<List<Response2>>
         .flatMap(list -> {
            // ... should handle empty list if needed
            finalRequest = createRequest(request1, list);
            return webclient.getSomething(finalRequest); // Mono<Response1>
         });

Upvotes: 2

Related Questions