Reputation: 113
I'm calling a rest endpoint multi time and save the response on a db with jpa. When all response are received I've to call a stored procedure in the database.
I'm trying to use web client to do this request but I'm not know how to wait/check all request are finished.
This is the code I'm using to call the endpoint :
private Mono<DSGetDataResponse> GetData(DSGetDataRequest dataRequest){
try {
return
weblicent.post()
.uri("GetData")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON_UTF8)
.syncBody(dataRequest)
.retrieve()
.bodyToMono(DSGetDataResponse.class);
} catch (Exception e) {
log.info("Sono in catch (Exception");
e.printStackTrace();
return null;
}
}
and this is the code I'm using to call the above method
Items_to_request().forEach( x -> {
String token = tokenResponse.getTokenValue();
DSGetDataRequest dataRequest = new DSGetDataRequest(token, this.Richista_DS(x), null);
try{
this.GetData(dataRequest).subscribe(dataResponse -> this.handlerGetDataResponse(dataResponse));
}
catch (Exception e)
{log.info("Sono in catch (Exception");
e.printStackTrace();
}
});
On handlerGetDataResponse I only save the response in the database.
How to wait all request are finished to call the stored procedure in the database?
I know I'm mixing no blocking call with blocking call
Do you have any suggestions how to solve the problem?
Thanks Mirko
Upvotes: 1
Views: 6982
Reputation: 239
I've rewritten your code using Flux
Flux.fromIterable(Items_to_request())
.flatMap( x -> {
String token = tokenResponse.getTokenValue();
DSGetDataRequest dataRequest = new DSGetDataRequest(token, this.Richista_DS(x), null);
return this.GetData(dataRequest));
})
.onErrorResume(<add your logic>)
.collectList()
.subscribe(List<Data> data -> {< do what you want here})
You'll have to handle error properly using onErrorResume or onErrorContinue or similar operators or else flux will terminate. Check here
This code will fire all the requests and collect the response into a list and then subscribes to it.
Upvotes: 2