Reputation: 71
I have a large data extraction job written using Dask, where each task will query one table from a large number of tables across several dozen databases. For each database instance, I would like to limit the number of tasks that connect at one time (i.e. throttling). For example, I may have 100 tasks which connect to database A, 100 for database B, and 100 for database C, etc., and I would like to ensure that no more than 20 tasks are connecting to any database at any given time.
I see that Dask provides constraints based on worker resources (CPU, MEM, GPU, etc.), however the database resources are "global", and therefore are not particular to any Dask worker. Does Dask provide any means of modeling such constraints on task concurrency?
Upvotes: 2
Views: 886
Reputation: 71
After reading through the docs for several hours, I found an answer to my own question. Dask offers distributed Semaphores which can limit concurrent access to resources such as databases. For more information, see:
https://docs.dask.org/en/latest/futures.html#id1
import time
from dask.distributed import Client, Semaphore
client = Client(...)
def do_task(x, sem):
with sem:
time.sleep(5)
return x
# allow no more than 5 tasks to run concurrently
sem = Semaphore(max_leases=5, name="Limiter")
# submit jobs that use the semaphore
futures = client.map(do_task, range(20), sem=sem)
# collect results
results = client.gather(futures)
Upvotes: 5