CSjunkie
CSjunkie

Reputation: 555

How to limit number of threads within a time period

A service I am using starts blocking requests after 5 are made within 1 second.

Using Java in Spring I am looking for a way to queue threads in such a way that up to 5 threads can access the critical section within a second and any other threads are queued up and released once there is bandwidth for them to continue.

Currently I've attempted this with a lock but it causes the thread to wait 1/5th of a second always, even if we wouldn't be at the max calls per second without sleeping.

    Lock l = new ReentrantLock();
    try {
        l.lock();
      //critical section
   } finally {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        l.unlock();
    }

With this implementation I never exceed the 5 per second but I also cause the response to be delayed by 200 milli after everything is ready to be returned to the user.

I need a solution that only delays threads when a delay is needed. In this case the 6th+ call in a second should be delayed but the first 5 do not need to be delayed. Likewise calls 6-11 could all go through at the same time.

Upvotes: 4

Views: 1560

Answers (2)

Sergei Voitovich
Sergei Voitovich

Reputation: 2952

I think solving it using semaphore API would be the best approach.

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BulkheadSemaphore {

    private Queue<Long> enterQueue = new LinkedList<>();
    private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    private Semaphore semaphore;

    public BulkheadSemaphore(final Long timeLimit, final int concurrentThreadsLimit) {
        this.semaphore = new Semaphore(concurrentThreadsLimit);

        executor.scheduleAtFixedRate(() -> {
            final Long now = now();

            while (!enterQueue.isEmpty() && now - enterQueue.peek() >= timeLimit) {
                enterQueue.poll();
                semaphore.release();
            }
        }, timeLimit, 200, TimeUnit.MILLISECONDS);
    }

    private Long now() {
        return System.currentTimeMillis();
    }

    public void acquire() {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            // todo: handle exception
        }
    }

    public void release() {
        semaphore.release();
    }
}

The api is quite simple:

  1. Each thread entering the critical section, call bulkheadSemaphore.acqure()
  2. After an external call execution finishes, call bulkheadSemaphore.release()

Why does it solve the problem?

  • This semaphore releases permits for threads which entered the critical section long time ago.
  • It releases it's permits at a certain rate (I set it to 200ms, it can be smaller though). It also guarantees that if a work unit has been done quickly, the next thread will be able to start a new work unit.
  • Some threads would still face redundant waiting, however it doesn't happen every time and they'd spend 200ms at most.

As requests take time, I'd set timeLimit to 1.5 seconds to match your 1 second limitation.

P.S. Don't forget to shutdown executor service

Upvotes: 0

kenny_k
kenny_k

Reputation: 3980

This sort of rate-limiting is quite a common problem in microservice architectures, as it is part of the broader issue of addressing cascading failures. There are many libraries around to deal with this issue, and one of the most widely-used modern ones is called Resilience4j, which provides a RateLimiter implementation. You probably want something pretty close to this:

Create the limiter:

RateLimiterConfig config = RateLimiterConfig.custom()
  .limitRefreshPeriod(Duration.ofSeconds(1))
  .limitForPeriod(5)
  .timeoutDuration(Duration.ofSeconds(4)) //or however long you want to wait before failing
  .build();

// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);

// Use registry
RateLimiter rateLimiter = rateLimiterRegistry
  .rateLimiter("someServiceLimiter", config);

Use it:

// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
    .decorateCheckedRunnable(rateLimiter, backendService::doSomething);

//Or, you can use an annotation:
@RateLimiter(name = "someServiceLimiter")
public void doSomething() {
    //backend call
}

Upvotes: 3

Related Questions