Hrishikesh Joshi
Hrishikesh Joshi

Reputation: 25

How to call rest api in loop using webclient and process all the responses in non-blocking way in java

I am trying to use WebClient to call a rest api in a loop and process the response in non-blocking way.
Use case as follows,
I need to call a rest api which returns data for at the most 7 days(startDate, endDate) but I receive input date range which can be any period such as 30 days, 6 months, 1 year etc and I need to call the api accordingly.
So e.g. if I get input date range of 30 days then I need to call the api 5 times to get required data.
Lets say I need to find people with minimum loanAmount from 2022-09-01 to 2022-09-30. I can use this api getLoanDetails(startDate, endDate) which accepts dates within 7 days period only. So I need to call this api 5 times (2022-09-01 to 2022-09-07, 2022-09-08 to 2022-09-14, 2022-09-15 to 2022-09-21, 2022-09-22 to 2022-09-28 and 2022-09-29 to 2022-09-30)

I have created a list of dates of 7 days period from input date and running a loop over it and calling the api using WebClient in loop. I need to parse the json response, create Person objects from it and collect all persons with minimum loanAmount into a list(as there can be many with same loanAmount)
record Person(String name, BigDecimal loanAmount, LocalDate dateOfBirth) {}

Flux<Person> findPeopleWithMinimumLoan(LocalDate startDate, LocalDate endDate) {

  List<Pair<LocalDate>> dateList = getDates(startDate,endDate);
  for (Pair<LocalDate> periodToFindLoanDetails:dateList) { 
     Mono<List<Person>> personsWithMinimumLoanAmount = 
      callLoanApi(periodToFindLoanDetails.startDate(), periodToFindLoanDetails.endDate())
                    .map(this::findPeopleWithMinimumLoanAmount);

   **//TODO: Compare the result of all the loan api calls in a non blocking way and 
             return Flux<Person> or Mono<List<Person>>**
}

Mono<String> callLoanApi(LocalDate startDate, LocalDate endDate) { 
   return WebClient.create(apiURI)
                .get()
                .uri(uriBuilder -> uriBuilder.queryParam("aoi_key", apiKey)
                        .queryParam("start_date", startDate)
                        .queryParam("end_date", endDate).build())
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(String.class);
}

List<Person> findPeopleWithMinimumLoanAmount(String jsonFromLoanApi)


Now, I am struggling to use reactive data structures to process the response from all the api calls in a non-blocking way. I can for sure use block() and get result from each call and save it in some intermediate list and compare with next result and keep finding minimum until I got all the results. What is the ideal/better way to do this in a non-blocking and efficient way. Thank you in advance for going through my question and apologies for long question or if question is not clear. Kindly let me know in comments and I can rectify. I have omitted some code as it is not relevant IMO.

Upvotes: 1

Views: 2174

Answers (2)

Hrishikesh Joshi
Hrishikesh Joshi

Reputation: 25

After some digging around I found one possible solution as below, Instead of using for loop to call loanApi multiple times I used Flux.parallel to do the same and the code looks like below

ParallelFlux<List<Person>> findPeopleWithMinimumLoan(LocalDate startDate, LocalDate endDate) {

  List<Pair<LocalDate>> dateList = getDates(startDate,endDate);
  return Flux.fromIterable(dateList)
                .parallel()
                .flatMap(period -> callLoanApi(period.startDate(), period.endDate()))
                .map(this::parseJsonResponse)
                .flatMap(people -> Flux.fromIterable(people))
                        .collect(ArrayList::new, collectPeopleFromAllCalls);
}

Mono<String> callLoanApi(LocalDate startDate, LocalDate endDate) { 
   return WebClient.create(apiURI)
                .get()
                .uri(uriBuilder -> uriBuilder.queryParam("aoi_key", apiKey)
                        .queryParam("start_date", startDate)
                        .queryParam("end_date", endDate).build())
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(String.class);
}

List<Person> parseJsonResponse(String jsonFromLoanApi)

Upvotes: 0

Andreas
Andreas

Reputation: 5309

This is a quite broad question and a concrete answer depends on your environment. Assuming that calling the API you are consuming in this way is OK for the owner of the API and it can cope with the load.

Chain into the successful returned Mono with subscribe, flatMap or one of the then variations. If you want to execute a batch insert combine the Monos and zip them together for instance with zipWith or manually.

This would result in something like (used Kotlin for brevity) for a simple use-case:

@Transactional
fun save(todo: Todo): Mono<Todo> {
  return todoRepository.save(mapToEntity(todo))
    .flatMap { saved -> updateTasks(todo, saved) }
}

https://github.com/AndreasKl/todo-reactive/blob/main/src/main/kotlin/todoservice/todo/TodoUseCases.kt#L36

Upvotes: 1

Related Questions