Reputation: 25
I am trying to use WebClient to call a rest api in a loop and process the response in non-blocking way.
Use case as follows,
I need to call a rest api which returns data for at the most 7 days(startDate, endDate) but I receive input date range which can be any period such as 30 days, 6 months, 1 year etc and I need to call the api accordingly.
So e.g. if I get input date range of 30 days then I need to call the api 5 times to get required data.
Lets say I need to find people with minimum loanAmount from 2022-09-01 to 2022-09-30.
I can use this api getLoanDetails(startDate, endDate) which accepts dates within 7 days period only. So I need to call this api 5 times (2022-09-01 to 2022-09-07, 2022-09-08 to 2022-09-14, 2022-09-15 to 2022-09-21, 2022-09-22 to 2022-09-28 and 2022-09-29 to 2022-09-30)
I have created a list of dates of 7 days period from input date and running a loop over it and calling the api using WebClient in loop. I need to parse the json response, create Person objects from it and collect all persons with minimum loanAmount into a list(as there can be many with same loanAmount)
record Person(String name, BigDecimal loanAmount, LocalDate dateOfBirth) {}
Flux<Person> findPeopleWithMinimumLoan(LocalDate startDate, LocalDate endDate) {
List<Pair<LocalDate>> dateList = getDates(startDate,endDate);
for (Pair<LocalDate> periodToFindLoanDetails:dateList) {
Mono<List<Person>> personsWithMinimumLoanAmount =
callLoanApi(periodToFindLoanDetails.startDate(), periodToFindLoanDetails.endDate())
.map(this::findPeopleWithMinimumLoanAmount);
**//TODO: Compare the result of all the loan api calls in a non blocking way and
return Flux<Person> or Mono<List<Person>>**
}
Mono<String> callLoanApi(LocalDate startDate, LocalDate endDate) {
return WebClient.create(apiURI)
.get()
.uri(uriBuilder -> uriBuilder.queryParam("aoi_key", apiKey)
.queryParam("start_date", startDate)
.queryParam("end_date", endDate).build())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
}
List<Person> findPeopleWithMinimumLoanAmount(String jsonFromLoanApi)
Now, I am struggling to use reactive data structures to process the response from all the api calls in a non-blocking way. I can for sure use block() and get result from each call and save it in some intermediate list and compare with next result and keep finding minimum until I got all the results. What is the ideal/better way to do this in a non-blocking and efficient way. Thank you in advance for going through my question and apologies for long question or if question is not clear. Kindly let me know in comments and I can rectify. I have omitted some code as it is not relevant IMO.
Upvotes: 1
Views: 2174
Reputation: 25
After some digging around I found one possible solution as below, Instead of using for loop to call loanApi multiple times I used Flux.parallel to do the same and the code looks like below
ParallelFlux<List<Person>> findPeopleWithMinimumLoan(LocalDate startDate, LocalDate endDate) {
List<Pair<LocalDate>> dateList = getDates(startDate,endDate);
return Flux.fromIterable(dateList)
.parallel()
.flatMap(period -> callLoanApi(period.startDate(), period.endDate()))
.map(this::parseJsonResponse)
.flatMap(people -> Flux.fromIterable(people))
.collect(ArrayList::new, collectPeopleFromAllCalls);
}
Mono<String> callLoanApi(LocalDate startDate, LocalDate endDate) {
return WebClient.create(apiURI)
.get()
.uri(uriBuilder -> uriBuilder.queryParam("aoi_key", apiKey)
.queryParam("start_date", startDate)
.queryParam("end_date", endDate).build())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
}
List<Person> parseJsonResponse(String jsonFromLoanApi)
Upvotes: 0
Reputation: 5309
This is a quite broad question and a concrete answer depends on your environment. Assuming that calling the API you are consuming in this way is OK for the owner of the API and it can cope with the load.
Chain into the successful returned Mono
with subscribe
, flatMap
or one of the then
variations. If you want to execute a batch insert combine the Mono
s and zip them together for instance with zipWith
or manually.
This would result in something like (used Kotlin for brevity) for a simple use-case:
@Transactional
fun save(todo: Todo): Mono<Todo> {
return todoRepository.save(mapToEntity(todo))
.flatMap { saved -> updateTasks(todo, saved) }
}
Upvotes: 1