ps0604
ps0604

Reputation: 1071

Queueing up workers in Dask

I have the following scenario that I need to solve with Dask scheduler and workers:

That means that I need N * W workers to run everything in parallel. The problem is that this is not optimal as it's too much resource allocation, I run it on the cloud and it's expensive. Also, N is defined by the user, so I don't know beforehand how much processing capability I need to have.

Is there a way to queue up the workers in such a way that if I define that Dask has X workers, when a worker ends then the next one starts?

Upvotes: 1

Views: 1158

Answers (1)

ti7
ti7

Reputation: 18796

First define the number of workers you need, treat them as ephemeral, but static for the entire duration of your processing
You can create them dynamically (when you start or later on), but probably want to have them all ready right at the beginning of your processing

From your view, the client is an executor (so when you refer to workers and running in parallel, you probably mean the same thing

This class resembles executors in concurrent.futures but also allows Future objects within submit/map calls. When a Client is instantiated it takes over all dask.compute and dask.persist calls by default.

Once your workers are available, Dask will distribute work given to them via the scheduler

You should make any tasks that depend on each other do so by passing the result to dask.delayed() with the preceeding function result (which is a Future, and not yet the result)
This Futures-as-arguments will allow Dask to build a task graph of your work

Example use https://examples.dask.org/delayed.html
Future reference https://docs.dask.org/en/latest/futures.html#distributed.Future

Dependent Futures with dask.delayed

Here's a complete example from the Delayed docs (actually combines several successive examples to the same result)

import dask
from dask.distributed import Client

client = Client(...)  # connect to distributed cluster

def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)    # depends on a and b
    output.append(c)

total = dask.delayed(sum)(output)  # depends on everything
total.compute()  # 45

You can call total.visualize() to see the task graph

Task Graph from Dask Docs
(image from Dask Delayed docs)

Collections of Futures

If you're already using .map(..) to map function and argument pairs, you can keep creating Futures and then .gather(..) them all at once, even if they're in a collection (which is convenient to you here)

The .gather()'ed results will be in the same arrangement as they were given (a list of lists)

[[fn1(args11), fn1(args12)], [fn2(args21)], [fn3(args31), fn3(args32), fn3(args33)]]

https://distributed.dask.org/en/latest/api.html#distributed.Client.gather

import dask
from dask.distributed import Client

client = Client(...)  # connect to distributed cluster

collection_of_futures = []

for worker_func, worker_args in iterable_of_pairs_of_fn_args:
    futures = client.map(worker_func, worker_args)
    collection_of_futures.append(futures)

results = client.gather(collection_of_futures)

notes

  • worker_args must be some iterable to map to worker_func, which can be a source of error
  • .gather()ing will block until all the futures are completed or raise

.as_completed()

If you need the results as quickly as possible, you could use .as_completed(..), but note the results will be in a non-deterministic order, so I don't think this makes sense for your case .. if you find it does, you'll need some extra guarantees

  • include information about what to do with the result in the result
  • keep a reference to each and check them
  • only combine groups where it doesn't matter (ie. all the Futures have the same purpose)

also note that the yielded futures are complete, but are still a Future, so you still need to call .result() or .gather() them

https://distributed.dask.org/en/latest/api.html#distributed.as_completed

Upvotes: 2

Related Questions