Reputation: 1071
I have the following scenario that I need to solve with Dask scheduler and workers:
Dask program has N functions called in a loop (N defined by the user)
Each function is started with delayed(func)(args)
to run in parallel.
When each function from the previous point starts, it triggers W workers. This is how I invoke the workers:
futures = client.map(worker_func, worker_args)
worker_responses = client.gather(futures)
That means that I need N * W workers to run everything in parallel. The problem is that this is not optimal as it's too much resource allocation, I run it on the cloud and it's expensive. Also, N is defined by the user, so I don't know beforehand how much processing capability I need to have.
Is there a way to queue up the workers in such a way that if I define that Dask has X workers, when a worker ends then the next one starts?
Upvotes: 1
Views: 1158
Reputation: 18796
First define the number of workers you need, treat them as ephemeral, but static for the entire duration of your processing
You can create them dynamically (when you start or later on), but probably want to have them all ready right at the beginning of your processing
From your view, the client is an executor (so when you refer to workers and running in parallel, you probably mean the same thing
This class resembles executors in
concurrent.futures
but also allowsFuture
objects withinsubmit/map
calls. When a Client is instantiated it takes over alldask.compute
anddask.persist
calls by default.
Once your workers are available, Dask will distribute work given to them via the scheduler
You should make any tasks that depend on each other do so by passing the result to dask.delayed()
with the preceeding function result (which is a Future, and not yet the result)
This Futures-as-arguments will allow Dask to build a task graph of your work
Example use https://examples.dask.org/delayed.html
Future reference https://docs.dask.org/en/latest/futures.html#distributed.Future
Here's a complete example from the Delayed docs (actually combines several successive examples to the same result)
import dask
from dask.distributed import Client
client = Client(...) # connect to distributed cluster
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b) # depends on a and b
output.append(c)
total = dask.delayed(sum)(output) # depends on everything
total.compute() # 45
You can call total.visualize()
to see the task graph
(image from Dask Delayed docs)
If you're already using .map(..)
to map function and argument pairs, you can keep creating Futures and then .gather(..)
them all at once, even if they're in a collection (which is convenient to you here)
The .gather()
'ed results
will be in the same arrangement as they were given (a list of lists)
[[fn1(args11), fn1(args12)], [fn2(args21)], [fn3(args31), fn3(args32), fn3(args33)]]
https://distributed.dask.org/en/latest/api.html#distributed.Client.gather
import dask
from dask.distributed import Client
client = Client(...) # connect to distributed cluster
collection_of_futures = []
for worker_func, worker_args in iterable_of_pairs_of_fn_args:
futures = client.map(worker_func, worker_args)
collection_of_futures.append(futures)
results = client.gather(collection_of_futures)
notes
worker_args
must be some iterable to map to worker_func
, which can be a source of error.gather()
ing will block until all the futures are completed or raise.as_completed()
If you need the results as quickly as possible, you could use .as_completed(..)
, but note the results will be in a non-deterministic order, so I don't think this makes sense for your case .. if you find it does, you'll need some extra guarantees
also note that the yielded futures are complete, but are still a Future, so you still need to call .result()
or .gather()
them
https://distributed.dask.org/en/latest/api.html#distributed.as_completed
Upvotes: 2