Reputation: 973
I am using Flux to build my reactive pipeline. In the pipeline, I need to call 3 different external system REST APIs, which are very strict with their Rate of access. I will be throttled exponentially if i breach the rate-per-sec threshold. Each system will have their own thresholds.
I am using Spring WebClient to make the REST API calls; among 3 APIs, 2 of them are GET and 1 is POST.
In my reactor pipeline, the WebClient are wrapped within the flatMap to perform the API calls, like the below code:
WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string
Flux.generator(generator) // Generator pushes the elements from source 1 at a time
// make call to 1st API Service
.flatMap(data -> getApiCall1)
.map(api1Response -> api1ResponseModified)
// make call to 2nd API Service
.flatMap(api1ResponseModified -> getApiCall2)
.map(api2Response -> api2ResponseModified)
// make call to 3rd API Service
.flatMap(api2ResponseModified -> getApiCall3)
.map(api3Response -> api3ResponseModified)
// rest of the pipeline operators
//end
.subscriber();
The problem is, if I dont set the concurrency
value to the flatMap, then I the pipeline execution breaching the threshold within few seconds of service startup.
If I set the value of concurrency
to 1, 2, 5, 10, then the throughput is becoming very low.
The Question is, Without setting any value to the concurrency How could I achieve the Backpressure which should honour the rate limits of the external system?
Upvotes: 1
Views: 4663
Reputation: 9997
Resilience4j has support for rate limiting with Reactor. See:
https://resilience4j.readme.io/docs/ratelimiter
https://resilience4j.readme.io/docs/examples-1#section-decorate-mono-or-flux-with-a-ratelimiter
Upvotes: 1
Reputation: 72379
Given you have a "rate per second" requirement, I would explicitly window the flux and limit each window to the chosen time period. This will give you the maximum throughput without being throttled.
I would use a helper function similar to:
public static <T> Flux<T> limitIntervalRate(Flux<T> flux, int ratePerInterval, Duration interval) {
return flux
.window(ratePerInterval)
.zipWith(Flux.interval(Duration.ZERO, interval))
.flatMap(Tuple2::getT1);
}
which allows you to do:
sourceFlux
.transform(f -> limitIntervalRate(f, 2, Duration.ofSeconds(1))) //Limit to a rate of 2 per second
You can then map this as necessary onto your WebClient
calls while respecting the limit in place for each API:
sourceFlux
//...assume API 1 has a limit of 10 calls per second
.transform(f -> limitIntervalRate(f, 10, Duration.ofSeconds(1)))
.flatMap(data -> getApiCall1)
.map(api1Response -> api1ResponseModified)
//...assume API 2 has a limit of 20 calls per second
.transform(f -> limitIntervalRate(f, 20, Duration.ofSeconds(1)))
.flatMap(api1ResponseModified -> getApiCall2)
.map(api2Response -> api2ResponseModified)
...and so on.
Upvotes: 12