Reputation: 3716
I have a simple dask workflow. when I print it out it looks like this:
workflow = {
'a_task': (<function a_func at 0x7f1dc5ded598>,),
'b_task': (<function b_func at 0x7f1dc5ded620>,),
'c_task': (<function c_func at 0x7f1ddb07aea0>,),
'd_task': (<function d_func at 0x7f1dc5ded6a8>,),
'workflow_end_task': (<function aggregate_func at 0x7f1dc5ded730>,
'a_task', 'b_task', 'c_task', 'd_task')
}
Turns out the b_func
is a little function with a for loop that does about 1000 iterations and takes about an hour to finish. it basically looks like this:
def b_func(args...):
data = []
for this in that:
data.append(...)
return data
That for loop doesn't have to be done in order though. It could be done in parallel.
So the question is: how should I handle this? Should I convert that for loop to a workflow and put another call to dask inside of b_func
? or should I pull this process out and expand the original workflow?
Basically, can I nest dask workflows or is that a bad idea?
Also, you should know that I'm using from dask.distributed import Client
and Client.get
in order to distribute the workflow among an entire cluster of computers. I don't know if that complicates things beyond dask.threaded.get
but maybe it makes a difference. I guess it means that one of the dask workers
would then have to set up a new scheduler and workers on all the machines of the cluster and then pass it's workflow to them. Maybe, idk.
Has anyone dealt with this issue before?
Upvotes: 1
Views: 1097
Reputation: 2445
Should I convert that for loop to a workflow and put another call to dask inside of b_func? or should I pull this process out and expand the original workflow?
Basically, can I nest dask workflows or is that a bad idea?
In the general case, no, you shouldn't have tasks in dask also call compute
. However, you can do this with the distributed scheduler, and things should just work. If you don't specify a scheduler when calling compute in the task, the current scheduler will be used. The downside is that the submitting task ('b_task' in your case) will still be blocked the whole time, which takes up a thread on a worker (less efficient).
In your case I'd instead build the whole graph up beforehand using dask.delayed
(http://docs.dask.org/en/latest/delayed.html). This allows you to write loopy normal Python code, and have dask build up the graph for you. See the delayed documentation for more information.
Upvotes: 3