Mandroid
Mandroid

Reputation: 7468

How to limit number of virtual threads to avoid large number of DB connections?

I am currently using Executors.newFixedThreadPool to run a large number of tasks in parallel.

try(ExecutorService executorService = Executors.newFixedThreadPool(10)) {
        List<Future<Integer>> resultFutures = executorService.invokeAll(tasks);
        for (Future<Integer> rf: resultFutures ) {
                    ....
         }
         executorService.shutdown();
 }

Each task opens a DB connection.

I want to use virtual threads for same(Executors.newVirtualThreadPerTaskExecutor()).

But it may lead to large number of DB connections.

I searched, but couldn't find a way to limit number of virtual threads.

What is a better way of dealing with this situation?

Upvotes: 3

Views: 403

Answers (2)

Werner Altewischer
Werner Altewischer

Reputation: 10466

Implementing virtual threads will indeed shift the congestion to the next point which is normally the database connection pool.

To implement connection throttling properly you would have to identify the priority of requests that access the database, e.g.:

  • scheduled executions/batch tasks: LOW
  • retryable direct requests (non-critical): LOW
  • ordinary direct requests: MEDIUM
  • critical requests (authentication, error handling, management endpoints): HIGH

With this priority in mind you can decorate a connection pool with a priority queue to avoid overloading the connection pool with lower priority requests.

When the connection pool usage goes above some critical level you would have to start throttling requests and maybe even drop low priority requests from the queue all together and start throwing a ServiceUnavailable exception.

See the following code for inspiration:

/**
 * Datasource which uses a blocking priority queue to order connection requests based on priority.
 */
public class PriorityAwareDataSource implements DataSource, Closeable {
    /**
     * Should return the priority for the current request.
     * Defaults to the thread priority, but some other scheme may be used,
     * such as a ThreadLocal holding the priority based on some annotation/aspect.
     */
    public interface PriorityLocator {
        default int getPriority() {
            return Thread.currentThread().getPriority();
        }
    }

    /**
     * Asynchronous connection request.
     */
    @Getter
    private static final class ConnectionRequest implements Comparable<ConnectionRequest> {
        /**
         * Priority for the request
         */
        private final int priority;
        private final CompletableFuture<Connection> future = new CompletableFuture<>();
        private final ThrowingSupplier<Connection> supplier;

        public ConnectionRequest(int priority, @Nonnull ThrowingSupplier<Connection> supplier) {
            this.priority = priority;
            this.supplier = supplier;
        }

        @Override
        public int compareTo(@Nonnull ConnectionRequest other) {
            return Integer.compare(other.priority, this.priority);
        }

        public void complete() {
            try {
                if (!future.isDone()) {
                    future.complete(supplier.get());
                }
            } catch (Throwable t) {
                future.completeExceptionally(t);
            }
        }
    }

    private final DataSource delegate;
    private final PriorityBlockingQueue<ConnectionRequest> queue = new PriorityBlockingQueue<>(16);
    private final Duration timeout = Duration.ofSeconds(10);
    private final Thread dequeueThread = new Thread(this::dequeue);
    private final PriorityLocator priorityLocator;
    private static final PriorityLocator DEFAULT_PRIORITY_LOCATOR = new PriorityLocator() {};

    public PriorityAwareDataSource(DataSource delegate, @Nullable PriorityLocator priorityLocator) {
        this.delegate = delegate;
        this.priorityLocator = Optional.ofNullable(priorityLocator).orElse(DEFAULT_PRIORITY_LOCATOR);
    }

    @PostConstruct
    protected void start() {
        dequeueThread.start();
    }

    @PreDestroy
    protected void stop() {
        dequeueThread.interrupt();
    }

    @Override
    public Connection getConnection() throws SQLException {
        return getConnection(priorityLocator.getPriority(), delegate::getConnection);
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return getConnection(priorityLocator.getPriority(), () -> delegate.getConnection(username, password));
    }

    // Forward all other DataSource calls to delegate (not shown here)

    private void dequeue() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                queue.take().complete();
            } catch (InterruptedException e) {
                break;
            }
        }
        while (!queue.isEmpty()) {
            queue.poll().getFuture().cancel(true);
        }
    }

    private boolean canQueue(int priority) {
        if (dequeueThread.isInterrupted()) {
            return false;
        }
        // TODO: Maybe also check delegate dataSource levels to decide if we should drop this request all together,
        // which we may want at least for low priority. This would be datasource implementation dependent though.
        return true;
    }

    private Future<Connection> enqueue(int priority, ThrowingSupplier<Connection> supplier) {
        final var request = new ConnectionRequest(priority, supplier);
        queue.add(request);
        return request.getFuture();
    }

    private Connection getConnection(int priority, ThrowingSupplier<Connection> supplier) throws SQLException {
        if (!canQueue(priority)) {
            throw new ServiceUnavailableException();
        }
        var future = enqueue(priority, supplier);
        try {
            return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof SQLException sqlException) {
                throw sqlException;
            } else {
                throw new IllegalStateException("Failed getting database connection", e);
            }
        } catch (TimeoutException | InterruptedException e) {
            throw new ServiceUnavailableException();
        }
    }
}

Upvotes: 1

Basil Bourque
Basil Bourque

Reputation: 338181

tl;dr

static final mySemaphore = new Semaphore ( … ) ;
try
{
    mySemaphore() ;  // Blocks until a permit becomes available from the semaphore. 
    … do your work …
}
finally
{
    mySemaphore.release() ;  // Return permit to the semaphore’s pool of permits. 
}

Details

Virtual threads should not be pooled.

So we need to directly throttle our use of expensive resources.

By the way, be aware that enterprise-oriented database servers such as Postgres offer various ways of limiting connections. These have the advantage of being managed by the DBA/SysAdmin rather than the Java programmer. But here I focus on your Question asking about limiting Java virtual threads.

java.util.concurrent.Semaphore

Semaphore is a common way to limit the number of objects simultaneously accessing a resource.

Instantiate a Semaphore, specifying a limited number of permits. You likely want to mark the object reference static and final.

static final databaseConnectionLimitSemaphore = new Semaphore ( 42 ) ;

Your code grabs one of the permits by calling acquire. If no permit is available, the acquire call blocks until one becomes available. Blocked virtual threads are very efficient, being set aside (“parked”) to allow other virtual threads to execute.

When done using that resource, return the permit by calling release. Be sure to use try-finally syntax to ensure no permit is lost.

try
{
    databaseConnectionLimitSemaphore.acquire() ;  // Blocks until a permit becomes available. 
    … get your database connection …
    … do your database work …
}
finally
{
    databaseConnectionLimitSemaphore.release() ;  // Return permit to the semaphore. 
}

Upvotes: 2

Related Questions