Reputation: 7468
I am currently using Executors.newFixedThreadPool to run a large number of tasks in parallel.
try(ExecutorService executorService = Executors.newFixedThreadPool(10)) {
List<Future<Integer>> resultFutures = executorService.invokeAll(tasks);
for (Future<Integer> rf: resultFutures ) {
....
}
executorService.shutdown();
}
Each task opens a DB connection.
I want to use virtual threads for same(Executors.newVirtualThreadPerTaskExecutor()).
But it may lead to large number of DB connections.
I searched, but couldn't find a way to limit number of virtual threads.
What is a better way of dealing with this situation?
Upvotes: 3
Views: 403
Reputation: 10466
Implementing virtual threads will indeed shift the congestion to the next point which is normally the database connection pool.
To implement connection throttling properly you would have to identify the priority of requests that access the database, e.g.:
With this priority in mind you can decorate a connection pool with a priority queue to avoid overloading the connection pool with lower priority requests.
When the connection pool usage goes above some critical level you would have to start throttling requests and maybe even drop low priority requests from the queue all together and start throwing a ServiceUnavailable exception.
See the following code for inspiration:
/**
* Datasource which uses a blocking priority queue to order connection requests based on priority.
*/
public class PriorityAwareDataSource implements DataSource, Closeable {
/**
* Should return the priority for the current request.
* Defaults to the thread priority, but some other scheme may be used,
* such as a ThreadLocal holding the priority based on some annotation/aspect.
*/
public interface PriorityLocator {
default int getPriority() {
return Thread.currentThread().getPriority();
}
}
/**
* Asynchronous connection request.
*/
@Getter
private static final class ConnectionRequest implements Comparable<ConnectionRequest> {
/**
* Priority for the request
*/
private final int priority;
private final CompletableFuture<Connection> future = new CompletableFuture<>();
private final ThrowingSupplier<Connection> supplier;
public ConnectionRequest(int priority, @Nonnull ThrowingSupplier<Connection> supplier) {
this.priority = priority;
this.supplier = supplier;
}
@Override
public int compareTo(@Nonnull ConnectionRequest other) {
return Integer.compare(other.priority, this.priority);
}
public void complete() {
try {
if (!future.isDone()) {
future.complete(supplier.get());
}
} catch (Throwable t) {
future.completeExceptionally(t);
}
}
}
private final DataSource delegate;
private final PriorityBlockingQueue<ConnectionRequest> queue = new PriorityBlockingQueue<>(16);
private final Duration timeout = Duration.ofSeconds(10);
private final Thread dequeueThread = new Thread(this::dequeue);
private final PriorityLocator priorityLocator;
private static final PriorityLocator DEFAULT_PRIORITY_LOCATOR = new PriorityLocator() {};
public PriorityAwareDataSource(DataSource delegate, @Nullable PriorityLocator priorityLocator) {
this.delegate = delegate;
this.priorityLocator = Optional.ofNullable(priorityLocator).orElse(DEFAULT_PRIORITY_LOCATOR);
}
@PostConstruct
protected void start() {
dequeueThread.start();
}
@PreDestroy
protected void stop() {
dequeueThread.interrupt();
}
@Override
public Connection getConnection() throws SQLException {
return getConnection(priorityLocator.getPriority(), delegate::getConnection);
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return getConnection(priorityLocator.getPriority(), () -> delegate.getConnection(username, password));
}
// Forward all other DataSource calls to delegate (not shown here)
private void dequeue() {
while (!Thread.currentThread().isInterrupted()) {
try {
queue.take().complete();
} catch (InterruptedException e) {
break;
}
}
while (!queue.isEmpty()) {
queue.poll().getFuture().cancel(true);
}
}
private boolean canQueue(int priority) {
if (dequeueThread.isInterrupted()) {
return false;
}
// TODO: Maybe also check delegate dataSource levels to decide if we should drop this request all together,
// which we may want at least for low priority. This would be datasource implementation dependent though.
return true;
}
private Future<Connection> enqueue(int priority, ThrowingSupplier<Connection> supplier) {
final var request = new ConnectionRequest(priority, supplier);
queue.add(request);
return request.getFuture();
}
private Connection getConnection(int priority, ThrowingSupplier<Connection> supplier) throws SQLException {
if (!canQueue(priority)) {
throw new ServiceUnavailableException();
}
var future = enqueue(priority, supplier);
try {
return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof SQLException sqlException) {
throw sqlException;
} else {
throw new IllegalStateException("Failed getting database connection", e);
}
} catch (TimeoutException | InterruptedException e) {
throw new ServiceUnavailableException();
}
}
}
Upvotes: 1
Reputation: 338181
static final mySemaphore = new Semaphore ( … ) ;
try
{
mySemaphore() ; // Blocks until a permit becomes available from the semaphore.
… do your work …
}
finally
{
mySemaphore.release() ; // Return permit to the semaphore’s pool of permits.
}
Virtual threads should not be pooled.
So we need to directly throttle our use of expensive resources.
By the way, be aware that enterprise-oriented database servers such as Postgres offer various ways of limiting connections. These have the advantage of being managed by the DBA/SysAdmin rather than the Java programmer. But here I focus on your Question asking about limiting Java virtual threads.
java.util.concurrent.Semaphore
Semaphore
is a common way to limit the number of objects simultaneously accessing a resource.
Instantiate a Semaphore
, specifying a limited number of permits. You likely want to mark the object reference static
and final
.
static final databaseConnectionLimitSemaphore = new Semaphore ( 42 ) ;
Your code grabs one of the permits by calling acquire
. If no permit is available, the acquire
call blocks until one becomes available. Blocked virtual threads are very efficient, being set aside (“parked”) to allow other virtual threads to execute.
When done using that resource, return the permit by calling release
. Be sure to use try-finally
syntax to ensure no permit is lost.
try
{
databaseConnectionLimitSemaphore.acquire() ; // Blocks until a permit becomes available.
… get your database connection …
… do your database work …
}
finally
{
databaseConnectionLimitSemaphore.release() ; // Return permit to the semaphore.
}
Upvotes: 2