Naveen Kumar
Naveen Kumar

Reputation: 973

Reactor Flux flatMap operator throughput/concurrency control and achieve backpressure

I am using Flux to build my reactive pipeline. In the pipeline, I need to call 3 different external system REST APIs, which are very strict with their Rate of access. I will be throttled exponentially if i breach the rate-per-sec threshold. Each system will have their own thresholds.

I am using Spring WebClient to make the REST API calls; among 3 APIs, 2 of them are GET and 1 is POST.

In my reactor pipeline, the WebClient are wrapped within the flatMap to perform the API calls, like the below code:

WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string

    Flux.generator(generator) // Generator pushes the elements from source 1 at a time

    // make call to 1st API Service
    .flatMap(data -> getApiCall1)
    .map(api1Response -> api1ResponseModified)

    // make call to 2nd API Service
    .flatMap(api1ResponseModified -> getApiCall2)
    .map(api2Response -> api2ResponseModified)

// make call to 3rd API Service
.flatMap(api2ResponseModified -> getApiCall3)
.map(api3Response -> api3ResponseModified)

// rest of the pipeline operators

//end
.subscriber();

The problem is, if I dont set the concurrency value to the flatMap, then I the pipeline execution breaching the threshold within few seconds of service startup. If I set the value of concurrency to 1, 2, 5, 10, then the throughput is becoming very low.

The Question is, Without setting any value to the concurrency How could I achieve the Backpressure which should honour the rate limits of the external system?

Upvotes: 1

Views: 4663

Answers (2)

Michael Berry
Michael Berry

Reputation: 72379

Given you have a "rate per second" requirement, I would explicitly window the flux and limit each window to the chosen time period. This will give you the maximum throughput without being throttled.

I would use a helper function similar to:

public static <T> Flux<T> limitIntervalRate(Flux<T> flux, int ratePerInterval, Duration interval) {
    return flux
            .window(ratePerInterval)
            .zipWith(Flux.interval(Duration.ZERO, interval))
            .flatMap(Tuple2::getT1);
}

which allows you to do:

sourceFlux
        .transform(f -> limitIntervalRate(f, 2, Duration.ofSeconds(1))) //Limit to a rate of 2 per second

You can then map this as necessary onto your WebClient calls while respecting the limit in place for each API:

sourceFlux
        //...assume API 1 has a limit of 10 calls per second
        .transform(f -> limitIntervalRate(f, 10, Duration.ofSeconds(1)))
        .flatMap(data -> getApiCall1)
        .map(api1Response -> api1ResponseModified)

        //...assume API 2 has a limit of 20 calls per second
        .transform(f -> limitIntervalRate(f, 20, Duration.ofSeconds(1))) 
        .flatMap(api1ResponseModified -> getApiCall2)
        .map(api2Response -> api2ResponseModified)

...and so on.

Upvotes: 12

Related Questions