Youcef
Youcef

Reputation: 233

Dask Distributed - how to run one task per worker, making that task running on all cores available into the worker?

I'm very new at using distributed python library. I have 4 workers and i have successfully launched some parallel runs using 14 cores (among the 16 available) for each worker, resulting in 4*14=56 tasks running in parallel.

But how to proceed if I would like only one task at once in each worker. In that way, I expect one task using the 14 cores in parallel on the worker.

Upvotes: 6

Views: 3205

Answers (2)

leej3
leej3

Reputation: 83

Here is an example for when you wish to assign resource constraints when starting workers in python rather than at the command line:

from dask.distributed import Client
from dask import delayed
import time
import os

client_with_foo = Client(processes = False,
    n_workers= 2,
    threads_per_worker=10,
    resources = {'foo':1}
               )

@delayed
def do_work(cmd=None, interval=2):
    time.sleep(interval)
    return None


task_graph = []
for i in range(10):
    task_graph.append(do_work())

start = time.time()
result = client_with_foo.compute(task_graph, resources = {'foo':1})
output = client_with_foo.gather(result)
end = time.time()
print(end - start)

Ten 2-second tasks distributed across two workers takes 10 seconds to execute so the output of the above code is approximately 10.

Upvotes: 3

MRocklin
MRocklin

Reputation: 57261

Dask workers maintain a single thread pool that they use to launch tasks. Each task always consumes one thread from this pool. You can not tell a task to take many threads from this pool.

However, there are other ways to control and restrict concurrency within dask workers. In your case you might consider defining worker resources. This would let you stop many big tasks from running at the same time on the same workers.

In the following example we define that each worker has one Foo resource and that each task requires one Foo to run. This will stop any two tasks from running concurrently on the same worker.

dask-worker scheduler-address:8786 --resources Foo=1
dask-worker scheduler-address:8786 --resources Foo=1

.

from dask.distributed import Client
client = Client('scheduler-address:8786')
futures = client.map(my_expensive_function, ..., resources={'Foo': 1})

Upvotes: 6

Related Questions