Phoste
Phoste

Reputation: 1209

How to limit the request/second with WebClient?

I'm using a WebClient object to send Http Post request to a server. It's sending a huge amount of requests quite rapidly (there is about 4000 messages in a QueueChannel). The problem is... it seems the server can't respond fast enough... so I'm getting a lot of server error 500 and connexion closed prematurely.

Is there a way to limit the number of request per seconds ? Or limit the number of threads it's using ?

EDIT :

The Message endpoint processe message in a QueueChannel :

@MessageEndpoint
public class CustomServiceActivator {

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    IHttpService httpService;

    @ServiceActivator(
            inputChannel = "outputFilterChannel",
            outputChannel = "outputHttpServiceChannel",
            poller = @Poller( fixedDelay = "1000" )
    )
    public void processMessage(Data data) {
        httpService.push(data);
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

The WebClient service class :

@Service
public class HttpService implements IHttpService {

    private static final String URL = "http://www.blabla.com/log";

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    WebClient webClient;

    @Override
    public void push(Data data) {
        String body = constructString(data);
        Mono<ResponseEntity<Response>> res = webClient.post()
                .uri(URL + getLogType(data))
                .contentLength(body.length())
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(body)
                .exchange()
                .flatMap(response -> response.toEntity(Response.class));

        res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
    }
}

Upvotes: 22

Views: 24598

Answers (5)

Gurmin
Gurmin

Reputation: 43

We can customize ConnectionBuilder to rate limit the active connections on WebClient.

Need to add pendingAquiredMaxCount for number of waiting requests on queue as the default queue size is always 2 * maxConnections.

This rate limits the webclient to serve the requests at a time.

ConnectionProvider provider = ConnectionProvider.builder('builder').maxConnections(maxConnections).pendingAcquireMaxCount(maxPendingRequests).build()
TcpClient tcpClient = TcpClient
                        .create(provider)
       
WebClient client = WebClient.builder()
                        .baseUrl('url')
                        .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))

Upvotes: 3

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9997

Resilience4j has excellent support for non-blocking rate limiting with Project Reactor.

Required dependencies (beside Spring WebFlux):

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.6.1</version>
</dependency>

Example:

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;

public class WebClientRateLimit
{
    private static final AtomicInteger COUNTER = new AtomicInteger(0);

    private final WebClient webClient;
    private final RateLimiter rateLimiter;

    public WebClientRateLimit()
    {
        this.webClient = WebClient.create();

        // enables 3 requests every 5 seconds
        this.rateLimiter = RateLimiter.of("my-rate-limiter",
                RateLimiterConfig.custom()
                                 .limitRefreshPeriod(Duration.ofSeconds(5))
                                 .limitForPeriod(3)
                                 .timeoutDuration(Duration.ofMinutes(1)) // max wait time for a request, if reached then error
                                 .build());
    }

    public Mono<?> call()
    {
        return webClient.get()
                        .uri("https://jsonplaceholder.typicode.com/todos/1")
                        .retrieve()
                        .bodyToMono(String.class)
                        .doOnSubscribe(s -> System.out.println(COUNTER.incrementAndGet() + " - " + LocalDateTime.now()
                                + " - call triggered"))
                        .transformDeferred(RateLimiterOperator.of(rateLimiter));
    }

    public static void main(String[] args)
    {
        WebClientRateLimit webClientRateLimit = new WebClientRateLimit();

        long start = System.currentTimeMillis();

        Flux.range(1, 16)
            .flatMap(x -> webClientRateLimit.call())
            .blockLast();

        System.out.println("Elapsed time in seconds: " + (System.currentTimeMillis() - start) / 1000d);
    }
}

Example output:

1 - 2020-11-30T15:44:01.575003200 - call triggered
2 - 2020-11-30T15:44:01.821134 - call triggered
3 - 2020-11-30T15:44:01.823133100 - call triggered
4 - 2020-11-30T15:44:04.462353900 - call triggered
5 - 2020-11-30T15:44:04.462353900 - call triggered
6 - 2020-11-30T15:44:04.470399200 - call triggered
7 - 2020-11-30T15:44:09.461199100 - call triggered
8 - 2020-11-30T15:44:09.463157 - call triggered
9 - 2020-11-30T15:44:09.463157 - call triggered
11 - 2020-11-30T15:44:14.461447700 - call triggered
10 - 2020-11-30T15:44:14.461447700 - call triggered
12 - 2020-11-30T15:44:14.461447700 - call triggered
13 - 2020-11-30T15:44:19.462098200 - call triggered
14 - 2020-11-30T15:44:19.462098200 - call triggered
15 - 2020-11-30T15:44:19.468059700 - call triggered
16 - 2020-11-30T15:44:24.462615 - call triggered
Elapsed time in seconds: 25.096

Docs: https://resilience4j.readme.io/docs/examples-1#decorate-mono-or-flux-with-a-ratelimiter

Upvotes: 27

Dmitriy Grigoryev
Dmitriy Grigoryev

Reputation: 41

I use this to limit the number of active requests:

public DemoClass(WebClient.Builder webClientBuilder) {
    AtomicInteger activeRequest = new AtomicInteger();
    this.webClient = webClientBuilder
            .baseUrl("http://httpbin.org/ip")
            .filter(
                    (request, next) -> Mono.just(next)
                            .flatMap(a -> {
                                if (activeRequest.intValue() < 3) {
                                    activeRequest.incrementAndGet();
                                    return next.exchange(request)
                                            .doOnNext(b -> activeRequest.decrementAndGet());
                                }
                              return Mono.error(new RuntimeException("Too many requests"));
                            })
                            .retryWhen(Retry.anyOf(RuntimeException.class)
                                    .randomBackoff(Duration.ofMillis(300), Duration.ofMillis(1000))
                                    .retryMax(50)
                            )
            )
            .build();
}

public Mono<String> call() {
    return webClient.get()
            .retrieve()
            .bodyToMono(String.class);
}

Upvotes: 3

ramirezag
ramirezag

Reputation: 564

I hope I'm not late for the party. Anyway, limiting the rate of the request is just one of the problem I faced a week ago as I was creating a crawler. Here are the issues:

  1. I have to do a recursive, paginated sequential request. Pagination parameters are included in the API that I'm calling for.
  2. Once a response is received, pause for 1 second before doing the next request.
  3. For certain errors encountered, do a retry
  4. On retry, pause for certain seconds

Here's the solution:

private Flux<HostListResponse> sequentialCrawl() {
    AtomicLong pageNo = new AtomicLong(2);
    // Solution for #1 - Flux.expand
    return getHosts(1)
        .doOnRequest(value -> LOGGER.info("Start crawling."))
        .expand(hostListResponse -> { 
            final long totalPages = hostListResponse.getData().getTotalPages();
            long currPageNo = pageNo.getAndIncrement();
            if (currPageNo <= totalPages) {
                LOGGER.info("Crawling page " + currPageNo + " of " + totalPages);
                // Solution for #2
                return Mono.just(1).delayElement(Duration.ofSeconds(1)).then(
                    getHosts(currPageNo)
                );
            }
            return Flux.empty();
        })
        .doOnComplete(() -> LOGGER.info("End of crawling."));
}

private Mono<HostListResponse> getHosts(long pageNo) {
    final String uri = hostListUrl + pageNo;
    LOGGER.info("Crawling " + uri);

    return webClient.get()
        .uri(uri)
        .exchange()
        // Solution for #3
        .retryWhen(companion -> companion
            .zipWith(Flux.range(1, RETRY + 1), (error, index) -> {
                String message = "Failed to crawl uri: " + error.getMessage();
                if (index <= RETRY && (error instanceof RequestIntervalTooShortException
                    || error instanceof ConnectTimeoutException
                    || "Connection reset by peer".equals(error.getMessage())
                )) {
                    LOGGER.info(message + ". Retries count: " + index);
                    return Tuples.of(error, index);
                } else {
                    LOGGER.warn(message);
                    throw Exceptions.propagate(error); //terminate the source with the 4th `onError`
                }
            })
            .map(tuple -> {
                // Solution for #4
                Throwable e = tuple.getT1();
                int delaySeconds = tuple.getT2();
                // TODO: Adjust these values according to your needs
                if (e instanceof ConnectTimeoutException) {
                    delaySeconds = delaySeconds * 5;
                } else if ("Connection reset by peer".equals(e.getMessage())) {
                    // The API that this app is calling will sometimes think that the requests are SPAM. So let's rest longer before retrying the request.
                    delaySeconds = delaySeconds * 10;
                }
                LOGGER.info("Will retry crawling after " + delaySeconds + " seconds to " + uri + ".");
                return Mono.delay(Duration.ofSeconds(delaySeconds));
            })
            .doOnNext(s -> LOGGER.warn("Request is too short - " + uri + ". Retried at " + LocalDateTime.now()))
        )
        .flatMap(clientResponse -> clientResponse.toEntity(String.class))
        .map(responseEntity -> {
            HttpStatus statusCode = responseEntity.getStatusCode();
            if (statusCode != HttpStatus.OK) {
                Throwable exception;
                // Convert json string to Java POJO
                HostListResponse response = toHostListResponse(uri, statusCode, responseEntity.getBody());
                // The API that I'm calling will return error code of 06 if request interval is too short
                if (statusCode == HttpStatus.BAD_REQUEST && "06".equals(response.getError().getCode())) {
                    exception = new RequestIntervalTooShortException(uri);
                } else {
                    exception = new IllegalStateException("Request to " + uri + " failed. Reason: " + responseEntity.getBody());
                }
                throw Exceptions.propagate(exception);
            } else {
                return toHostListResponse(uri, statusCode, responseEntity.getBody());
            }
        });
}

Upvotes: 5

Bartosz Bilicki
Bartosz Bilicki

Reputation: 13275

Question Limiting rate of requests with Reactor provides two answrers (one in comment)

zipWith another flux that acts as rate limiter

.zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))

just delay each web request

use delayElements function

edit: answer below is valid for blocking RestTemplate but do not really fit well into reactive pattern.

WebClient does not have ability to limit request, but you could easily add this feature using composition.

You may throttle your client externally using RateLimiter from Guava/ (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)

In this tutorial http://www.baeldung.com/guava-rate-limiter you will find how to use Rate limiter in blocking way, or with timeouts.

I would decorate all calls that need to be throttled in separate class that

  1. limits number of calls per second
  2. performs actual web call using WebClient

Upvotes: 9

Related Questions