Simon
Simon

Reputation: 553

Dask Distributed: Introduce graph dendencies on per worker initialisation tasks

In dask distributed, tasks are distributed over a cluster nodes via scheduler. I'm looking to introduce a per node dependency on task submitted to a node. Briefly, a compute operation that I'm seeking to perform needs to:

  1. Preload data onto a GPU on each node.
  2. Perform GPU compute on each node with other data in chunked dask arrays.

I also want to enqueue (1) and (2) multiple times multiple times on differing datasets.

I've tried to set this up as a minimal example:

from __future__ import print_function

import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
                         get_worker, as_completed)
import numpy as np

cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")

with cluster, Client(cluster.scheduler_address) as client:
    workers = client.scheduler_info()['workers'].values()
    workers =  [v['name'] for v in workers]

    print("Workers {}".format(workers))

    def init_worker():
        get_worker()._stuff = 0
        return "OK"

    # Call init_worker on each worker. Need pure to
    # ensure this happens multiple times
    init_futures = [client.submit(init_worker, pure=False,
                                            workers=[w])
                                  for w in workers]

    assert all(f.result() == "OK" for f in as_completed(init_futures))

    A = da.arange(0, 20, chunks=(5,), dtype=np.float64)

    def inc_worker(A):
        w = get_worker()
        w._stuff += 1
        print("{}[{}]".format(w.name, w._stuff))
        return A + w._stuff

    def increment(A):
        """ Call inc_worker """
        from dask.base import tokenize
        name = 'increment-' + tokenize(A)
        dsk = { (name, i): (inc_worker, (A.name, i))
                        for n, i in A.dask.keys() }

        dsk.update(A.dask)
        return da.Array(dsk, name, A.chunks, A.dtype)

    print(increment(A).compute())
    print(increment(A).compute())

I want to find some way to make the increment-* tasks submitted to each worker dependent on the init-worker-* tasks submitted to each worker. Put another way, I want avoid waiting for the init_futures to complete in the client. The problem this introduces is that, while we known which init-worker-* tasks are associated with each worker, there is no obvious way to know beforehand the worker association of the increment-* tasks.

One Possible Approach:

  1. For each inc_worker call, spawn a local_client() that submits a task with the init-worker-* in get_worker().data as a dependency. I don't like this because the overhead seems quite large.

Any suggestions on how to do this?

EDIT 1: Actually this works without waiting for the init_futures to complete, presumably because they're submitted to the worker schedulers before any of the increment-* tasks are submitted to the worker. It still feels like I'm making an assumption that may not always be True though...

EDIT 2: Mentioned that the 2 steps should be run multiple times on different datasets.

Upvotes: 3

Views: 367

Answers (1)

MRocklin
MRocklin

Reputation: 57281

Some options:

  1. Use client.run and wait. This does what your submit trick above does, but more explicitly and with less pain. It does however block, which you've said you don't want to do.

    client.run(init_worker)
    
  2. Use a worker --preload script to run arbitrary code as a worker starts up. See http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization

    cluster.start_worker(..., preload=['myscript.py'])
    
  3. Make init_worker idempotent (can be run many times without affect) and always call it within inc_worker

    def init_worker():
        if not hasattr(get_worker(), '_stuff'):
            get_worker()._stuff = 0
    
    def inc_worker(...):
        init_worker(...)
        ... do other things ...
    

Also, I notice that you're constructing dask.arrays by hand. You might want to look at x.map_blocks(my_func) and x.to_delayed/x.from_delayed

Upvotes: 3

Related Questions