wl2776
wl2776

Reputation: 4327

How to call Executor.map in custom dask graph?

I've got a computation, consisting of 3 "map" steps, and the last step depends on results of the first two. I am performing this task using dask.distributed running on several PCs.

Dependency graph looks like following.

map(func1, list1) -> res_list1-\
                                | -> create_list_3(res_list1, res_list2)-> list3 -> map(func3, list3)
map(func2, list2) -> res_list2-/

If we imagine that these computations are independent, then it is straightforward to call map function 3 times.

from distributed import Executor, progress

def process(jobid):
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
                                           port=config('SERVER_PORT')))
    futures = []
    futures.append(e.map(func1, list1))
    futures.append(e.map(func2, list2))
    futures.append(e.map(func3, list3))

    return futures

if __name__ == '__main__':
    jobid = 'blah-blah-blah'
    r = process(jobid)
    progress(r)

However, list3 is constructed from results of func1 and func2, and its creation is not easily mappable (list1, list2, res_list1 and res_list2 are stored in the Postgresql database and creation of list3 is a JOIN query, taking some time).

I've tried to add call to submit to the list of futures, however, that did not work as expected:

def process(jobid):
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
                                           port=config('SERVER_PORT')))
    futures = []
    futures.append(e.map(func1, list1))
    futures.append(e.map(func2, list2))
    futures.append(e.submit(create_list_3))
    futures.append(e.map(func3, list3))

    return futures

In this case one dask-worker has received the task to execute create_list_3, but others have simultaneously received tasks to call func3, that have erred, because list3 did not exist.

Obvious thing - I'm missing synchronization. Workers must stop and wait till creation of list3 is finished.

Documentation to dask describes custom task graphs, that can provide a synchronization.

However, examples in the documentation do not include map functions, only simple calculations, like calls add and inc.

Is it possible to use map and custom dask graph in my case, or should I implement sync with some other means, that are not included in dask?

Upvotes: 1

Views: 140

Answers (1)

MRocklin
MRocklin

Reputation: 57281

If you want to link dependencies between tasks then you should pass the outputs from previous tasks into the inputs of another.

futures1 = e.map(func1, list1)
futures2 = e.map(func2, list2)
futures3 = e.map(func3, futures1, futures2)

For any invocation of func3 Dask will handle waiting until the inputs are ready and will send the appropriate results to that function from wherever they get computed.

However it looks like you want to handle data transfer and synchronization through some other custom means. If this is so then perhaps it would be useful to pass along some token to the call to func3.

futures1 = e.map(func1, list1)
futures2 = e.map(func2, list2)

def do_nothing(*args):
    return None

token1 = e.submit(do_nothing, futures1)
token2 = e.submit(do_nothing, futures2)

list3 = e.submit(create_list_3)

def func3(arg, tokens=None):
    ...

futures3 = e.map(func3, list3, tokens=[token1, token2])

This is a bit of a hack, but would force all func3 functions to wait until they were able to get the token results from the previous map calls.

However I recommend trying to do something like the first option. This will allow dask to be much smarter about when it runs and can release resources. Barriers like token1/2 result in sub-optimal scheduling.

Upvotes: 1

Related Questions