Reputation: 555
A service I am using starts blocking requests after 5 are made within 1 second.
Using Java in Spring I am looking for a way to queue threads in such a way that up to 5 threads can access the critical section within a second and any other threads are queued up and released once there is bandwidth for them to continue.
Currently I've attempted this with a lock but it causes the thread to wait 1/5th of a second always, even if we wouldn't be at the max calls per second without sleeping.
Lock l = new ReentrantLock();
try {
l.lock();
//critical section
} finally {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
l.unlock();
}
With this implementation I never exceed the 5 per second but I also cause the response to be delayed by 200 milli after everything is ready to be returned to the user.
I need a solution that only delays threads when a delay is needed. In this case the 6th+ call in a second should be delayed but the first 5 do not need to be delayed. Likewise calls 6-11 could all go through at the same time.
Upvotes: 4
Views: 1560
Reputation: 2952
I think solving it using semaphore API would be the best approach.
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BulkheadSemaphore {
private Queue<Long> enterQueue = new LinkedList<>();
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private Semaphore semaphore;
public BulkheadSemaphore(final Long timeLimit, final int concurrentThreadsLimit) {
this.semaphore = new Semaphore(concurrentThreadsLimit);
executor.scheduleAtFixedRate(() -> {
final Long now = now();
while (!enterQueue.isEmpty() && now - enterQueue.peek() >= timeLimit) {
enterQueue.poll();
semaphore.release();
}
}, timeLimit, 200, TimeUnit.MILLISECONDS);
}
private Long now() {
return System.currentTimeMillis();
}
public void acquire() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
// todo: handle exception
}
}
public void release() {
semaphore.release();
}
}
The api is quite simple:
bulkheadSemaphore.acqure()
bulkheadSemaphore.release()
Why does it solve the problem?
As requests take time, I'd set timeLimit
to 1.5 seconds to match your 1 second limitation.
P.S. Don't forget to shutdown executor service
Upvotes: 0
Reputation: 3980
This sort of rate-limiting is quite a common problem in microservice architectures, as it is part of the broader issue of addressing cascading failures. There are many libraries around to deal with this issue, and one of the most widely-used modern ones is called Resilience4j, which provides a RateLimiter
implementation. You probably want something pretty close to this:
Create the limiter:
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(5)
.timeoutDuration(Duration.ofSeconds(4)) //or however long you want to wait before failing
.build();
// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
// Use registry
RateLimiter rateLimiter = rateLimiterRegistry
.rateLimiter("someServiceLimiter", config);
Use it:
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
//Or, you can use an annotation:
@RateLimiter(name = "someServiceLimiter")
public void doSomething() {
//backend call
}
Upvotes: 3