Reputation: 4327
I've got a computation, consisting of 3 "map" steps, and the last step depends on results of the first two. I am performing this task using dask.distributed
running on several PCs.
Dependency graph looks like following.
map(func1, list1) -> res_list1-\
| -> create_list_3(res_list1, res_list2)-> list3 -> map(func3, list3)
map(func2, list2) -> res_list2-/
If we imagine that these computations are independent, then it is straightforward to call map
function 3 times.
from distributed import Executor, progress
def process(jobid):
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
futures.append(e.map(func1, list1))
futures.append(e.map(func2, list2))
futures.append(e.map(func3, list3))
return futures
if __name__ == '__main__':
jobid = 'blah-blah-blah'
r = process(jobid)
progress(r)
However, list3
is constructed from results of func1
and func2
, and its creation is not easily map
pable (list1
, list2
, res_list1
and res_list2
are stored in the Postgresql database and creation of list3
is a JOIN
query, taking some time).
I've tried to add call to submit
to the list of futures, however, that did not work as expected:
def process(jobid):
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
futures.append(e.map(func1, list1))
futures.append(e.map(func2, list2))
futures.append(e.submit(create_list_3))
futures.append(e.map(func3, list3))
return futures
In this case one dask-worker
has received the task to execute create_list_3
, but others have simultaneously received tasks to call func3
, that have erred, because list3
did not exist.
Obvious thing - I'm missing synchronization. Workers must stop and wait till creation of list3
is finished.
Documentation to dask
describes custom task graphs, that can provide a synchronization.
However, examples in the documentation do not include map
functions, only simple calculations, like calls add
and inc
.
Is it possible to use map
and custom dask graph in my case, or should I implement sync with some other means, that are not included in dask
?
Upvotes: 1
Views: 140
Reputation: 57281
If you want to link dependencies between tasks then you should pass the outputs from previous tasks into the inputs of another.
futures1 = e.map(func1, list1)
futures2 = e.map(func2, list2)
futures3 = e.map(func3, futures1, futures2)
For any invocation of func3
Dask will handle waiting until the inputs are ready and will send the appropriate results to that function from wherever they get computed.
However it looks like you want to handle data transfer and synchronization through some other custom means. If this is so then perhaps it would be useful to pass along some token to the call to func3
.
futures1 = e.map(func1, list1)
futures2 = e.map(func2, list2)
def do_nothing(*args):
return None
token1 = e.submit(do_nothing, futures1)
token2 = e.submit(do_nothing, futures2)
list3 = e.submit(create_list_3)
def func3(arg, tokens=None):
...
futures3 = e.map(func3, list3, tokens=[token1, token2])
This is a bit of a hack, but would force all func3
functions to wait until they were able to get the token results from the previous map calls.
However I recommend trying to do something like the first option. This will allow dask to be much smarter about when it runs and can release resources. Barriers like token1/2
result in sub-optimal scheduling.
Upvotes: 1