Reputation: 1209
I'm using a WebClient
object to send Http Post request to a server.
It's sending a huge amount of requests quite rapidly (there is about 4000 messages in a QueueChannel
). The problem is... it seems the server can't respond fast enough... so I'm getting a lot of server error 500 and connexion closed prematurely.
Is there a way to limit the number of request per seconds ? Or limit the number of threads it's using ?
EDIT :
The Message endpoint processe message in a QueueChannel :
@MessageEndpoint
public class CustomServiceActivator {
private static final Logger logger = LogManager.getLogger();
@Autowired
IHttpService httpService;
@ServiceActivator(
inputChannel = "outputFilterChannel",
outputChannel = "outputHttpServiceChannel",
poller = @Poller( fixedDelay = "1000" )
)
public void processMessage(Data data) {
httpService.push(data);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
The WebClient service class :
@Service
public class HttpService implements IHttpService {
private static final String URL = "http://www.blabla.com/log";
private static final Logger logger = LogManager.getLogger();
@Autowired
WebClient webClient;
@Override
public void push(Data data) {
String body = constructString(data);
Mono<ResponseEntity<Response>> res = webClient.post()
.uri(URL + getLogType(data))
.contentLength(body.length())
.contentType(MediaType.APPLICATION_JSON)
.syncBody(body)
.exchange()
.flatMap(response -> response.toEntity(Response.class));
res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
}
}
Upvotes: 22
Views: 24598
Reputation: 43
We can customize ConnectionBuilder to rate limit the active connections on WebClient.
Need to add pendingAquiredMaxCount for number of waiting requests on queue as the default queue size is always 2 * maxConnections.
This rate limits the webclient to serve the requests at a time.
ConnectionProvider provider = ConnectionProvider.builder('builder').maxConnections(maxConnections).pendingAcquireMaxCount(maxPendingRequests).build()
TcpClient tcpClient = TcpClient
.create(provider)
WebClient client = WebClient.builder()
.baseUrl('url')
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
Upvotes: 3
Reputation: 9997
Resilience4j has excellent support for non-blocking rate limiting with Project Reactor.
Required dependencies (beside Spring WebFlux):
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>1.6.1</version>
</dependency>
Example:
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;
public class WebClientRateLimit
{
private static final AtomicInteger COUNTER = new AtomicInteger(0);
private final WebClient webClient;
private final RateLimiter rateLimiter;
public WebClientRateLimit()
{
this.webClient = WebClient.create();
// enables 3 requests every 5 seconds
this.rateLimiter = RateLimiter.of("my-rate-limiter",
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(5))
.limitForPeriod(3)
.timeoutDuration(Duration.ofMinutes(1)) // max wait time for a request, if reached then error
.build());
}
public Mono<?> call()
{
return webClient.get()
.uri("https://jsonplaceholder.typicode.com/todos/1")
.retrieve()
.bodyToMono(String.class)
.doOnSubscribe(s -> System.out.println(COUNTER.incrementAndGet() + " - " + LocalDateTime.now()
+ " - call triggered"))
.transformDeferred(RateLimiterOperator.of(rateLimiter));
}
public static void main(String[] args)
{
WebClientRateLimit webClientRateLimit = new WebClientRateLimit();
long start = System.currentTimeMillis();
Flux.range(1, 16)
.flatMap(x -> webClientRateLimit.call())
.blockLast();
System.out.println("Elapsed time in seconds: " + (System.currentTimeMillis() - start) / 1000d);
}
}
Example output:
1 - 2020-11-30T15:44:01.575003200 - call triggered
2 - 2020-11-30T15:44:01.821134 - call triggered
3 - 2020-11-30T15:44:01.823133100 - call triggered
4 - 2020-11-30T15:44:04.462353900 - call triggered
5 - 2020-11-30T15:44:04.462353900 - call triggered
6 - 2020-11-30T15:44:04.470399200 - call triggered
7 - 2020-11-30T15:44:09.461199100 - call triggered
8 - 2020-11-30T15:44:09.463157 - call triggered
9 - 2020-11-30T15:44:09.463157 - call triggered
11 - 2020-11-30T15:44:14.461447700 - call triggered
10 - 2020-11-30T15:44:14.461447700 - call triggered
12 - 2020-11-30T15:44:14.461447700 - call triggered
13 - 2020-11-30T15:44:19.462098200 - call triggered
14 - 2020-11-30T15:44:19.462098200 - call triggered
15 - 2020-11-30T15:44:19.468059700 - call triggered
16 - 2020-11-30T15:44:24.462615 - call triggered
Elapsed time in seconds: 25.096
Docs: https://resilience4j.readme.io/docs/examples-1#decorate-mono-or-flux-with-a-ratelimiter
Upvotes: 27
Reputation: 41
I use this to limit the number of active requests:
public DemoClass(WebClient.Builder webClientBuilder) {
AtomicInteger activeRequest = new AtomicInteger();
this.webClient = webClientBuilder
.baseUrl("http://httpbin.org/ip")
.filter(
(request, next) -> Mono.just(next)
.flatMap(a -> {
if (activeRequest.intValue() < 3) {
activeRequest.incrementAndGet();
return next.exchange(request)
.doOnNext(b -> activeRequest.decrementAndGet());
}
return Mono.error(new RuntimeException("Too many requests"));
})
.retryWhen(Retry.anyOf(RuntimeException.class)
.randomBackoff(Duration.ofMillis(300), Duration.ofMillis(1000))
.retryMax(50)
)
)
.build();
}
public Mono<String> call() {
return webClient.get()
.retrieve()
.bodyToMono(String.class);
}
Upvotes: 3
Reputation: 564
I hope I'm not late for the party. Anyway, limiting the rate of the request is just one of the problem I faced a week ago as I was creating a crawler. Here are the issues:
Here's the solution:
private Flux<HostListResponse> sequentialCrawl() {
AtomicLong pageNo = new AtomicLong(2);
// Solution for #1 - Flux.expand
return getHosts(1)
.doOnRequest(value -> LOGGER.info("Start crawling."))
.expand(hostListResponse -> {
final long totalPages = hostListResponse.getData().getTotalPages();
long currPageNo = pageNo.getAndIncrement();
if (currPageNo <= totalPages) {
LOGGER.info("Crawling page " + currPageNo + " of " + totalPages);
// Solution for #2
return Mono.just(1).delayElement(Duration.ofSeconds(1)).then(
getHosts(currPageNo)
);
}
return Flux.empty();
})
.doOnComplete(() -> LOGGER.info("End of crawling."));
}
private Mono<HostListResponse> getHosts(long pageNo) {
final String uri = hostListUrl + pageNo;
LOGGER.info("Crawling " + uri);
return webClient.get()
.uri(uri)
.exchange()
// Solution for #3
.retryWhen(companion -> companion
.zipWith(Flux.range(1, RETRY + 1), (error, index) -> {
String message = "Failed to crawl uri: " + error.getMessage();
if (index <= RETRY && (error instanceof RequestIntervalTooShortException
|| error instanceof ConnectTimeoutException
|| "Connection reset by peer".equals(error.getMessage())
)) {
LOGGER.info(message + ". Retries count: " + index);
return Tuples.of(error, index);
} else {
LOGGER.warn(message);
throw Exceptions.propagate(error); //terminate the source with the 4th `onError`
}
})
.map(tuple -> {
// Solution for #4
Throwable e = tuple.getT1();
int delaySeconds = tuple.getT2();
// TODO: Adjust these values according to your needs
if (e instanceof ConnectTimeoutException) {
delaySeconds = delaySeconds * 5;
} else if ("Connection reset by peer".equals(e.getMessage())) {
// The API that this app is calling will sometimes think that the requests are SPAM. So let's rest longer before retrying the request.
delaySeconds = delaySeconds * 10;
}
LOGGER.info("Will retry crawling after " + delaySeconds + " seconds to " + uri + ".");
return Mono.delay(Duration.ofSeconds(delaySeconds));
})
.doOnNext(s -> LOGGER.warn("Request is too short - " + uri + ". Retried at " + LocalDateTime.now()))
)
.flatMap(clientResponse -> clientResponse.toEntity(String.class))
.map(responseEntity -> {
HttpStatus statusCode = responseEntity.getStatusCode();
if (statusCode != HttpStatus.OK) {
Throwable exception;
// Convert json string to Java POJO
HostListResponse response = toHostListResponse(uri, statusCode, responseEntity.getBody());
// The API that I'm calling will return error code of 06 if request interval is too short
if (statusCode == HttpStatus.BAD_REQUEST && "06".equals(response.getError().getCode())) {
exception = new RequestIntervalTooShortException(uri);
} else {
exception = new IllegalStateException("Request to " + uri + " failed. Reason: " + responseEntity.getBody());
}
throw Exceptions.propagate(exception);
} else {
return toHostListResponse(uri, statusCode, responseEntity.getBody());
}
});
}
Upvotes: 5
Reputation: 13275
Question Limiting rate of requests with Reactor provides two answrers (one in comment)
zipWith another flux that acts as rate limiter
.zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))
just delay each web request
use delayElements function
edit: answer below is valid for blocking RestTemplate but do not really fit well into reactive pattern.
WebClient does not have ability to limit request, but you could easily add this feature using composition.
You may throttle your client externally using RateLimiter from Guava/ (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
In this tutorial http://www.baeldung.com/guava-rate-limiter you will find how to use Rate limiter in blocking way, or with timeouts.
I would decorate all calls that need to be throttled in separate class that
Upvotes: 9