Reputation: 553
In dask distributed, tasks are distributed over a cluster nodes via scheduler. I'm looking to introduce a per node dependency on task submitted to a node. Briefly, a compute operation that I'm seeking to perform needs to:
I also want to enqueue (1) and (2) multiple times multiple times on differing datasets.
I've tried to set this up as a minimal example:
from __future__ import print_function
import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
get_worker, as_completed)
import numpy as np
cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")
with cluster, Client(cluster.scheduler_address) as client:
workers = client.scheduler_info()['workers'].values()
workers = [v['name'] for v in workers]
print("Workers {}".format(workers))
def init_worker():
get_worker()._stuff = 0
return "OK"
# Call init_worker on each worker. Need pure to
# ensure this happens multiple times
init_futures = [client.submit(init_worker, pure=False,
workers=[w])
for w in workers]
assert all(f.result() == "OK" for f in as_completed(init_futures))
A = da.arange(0, 20, chunks=(5,), dtype=np.float64)
def inc_worker(A):
w = get_worker()
w._stuff += 1
print("{}[{}]".format(w.name, w._stuff))
return A + w._stuff
def increment(A):
""" Call inc_worker """
from dask.base import tokenize
name = 'increment-' + tokenize(A)
dsk = { (name, i): (inc_worker, (A.name, i))
for n, i in A.dask.keys() }
dsk.update(A.dask)
return da.Array(dsk, name, A.chunks, A.dtype)
print(increment(A).compute())
print(increment(A).compute())
I want to find some way to make the increment-*
tasks submitted to each worker dependent on the init-worker-*
tasks submitted to each worker.
Put another way, I want avoid waiting for the init_futures
to complete in the client. The problem this introduces is that, while we known which init-worker-*
tasks are associated with each worker, there is no obvious way to know beforehand the worker association of the increment-*
tasks.
One Possible Approach:
inc_worker
call, spawn a local_client()
that submits a task with the init-worker-*
in get_worker().data
as a dependency. I don't like this because the overhead seems quite large.Any suggestions on how to do this?
EDIT 1: Actually this works without waiting for the init_futures
to complete, presumably because they're submitted to the worker schedulers before any of the increment-*
tasks are submitted to the worker. It still feels like I'm making an assumption that may not always be True though...
EDIT 2: Mentioned that the 2 steps should be run multiple times on different datasets.
Upvotes: 3
Views: 367
Reputation: 57281
Some options:
Use client.run
and wait. This does what your submit trick above does, but more explicitly and with less pain. It does however block, which you've said you don't want to do.
client.run(init_worker)
Use a worker --preload script to run arbitrary code as a worker starts up. See http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization
cluster.start_worker(..., preload=['myscript.py'])
Make init_worker
idempotent (can be run many times without affect) and always call it within inc_worker
def init_worker():
if not hasattr(get_worker(), '_stuff'):
get_worker()._stuff = 0
def inc_worker(...):
init_worker(...)
... do other things ...
Also, I notice that you're constructing dask.arrays by hand. You might want to look at x.map_blocks(my_func) and x.to_delayed/x.from_delayed
Upvotes: 3