MetaStack
MetaStack

Reputation: 3716

nested dask workflows in python?

I have a simple dask workflow. when I print it out it looks like this:

workflow = {
    'a_task': (<function a_func at 0x7f1dc5ded598>,), 
    'b_task': (<function b_func at 0x7f1dc5ded620>,), 
    'c_task': (<function c_func at 0x7f1ddb07aea0>,), 
    'd_task': (<function d_func at 0x7f1dc5ded6a8>,), 
    'workflow_end_task': (<function aggregate_func at 0x7f1dc5ded730>, 
        'a_task', 'b_task', 'c_task', 'd_task')
}

Turns out the b_func is a little function with a for loop that does about 1000 iterations and takes about an hour to finish. it basically looks like this:

def b_func(args...):
    data = []
    for this in that:
        data.append(...)
    return data

That for loop doesn't have to be done in order though. It could be done in parallel.

So the question is: how should I handle this? Should I convert that for loop to a workflow and put another call to dask inside of b_func? or should I pull this process out and expand the original workflow?

Basically, can I nest dask workflows or is that a bad idea?

Also, you should know that I'm using from dask.distributed import Client and Client.get in order to distribute the workflow among an entire cluster of computers. I don't know if that complicates things beyond dask.threaded.get but maybe it makes a difference. I guess it means that one of the dask workers would then have to set up a new scheduler and workers on all the machines of the cluster and then pass it's workflow to them. Maybe, idk.

Has anyone dealt with this issue before?

Upvotes: 1

Views: 1097

Answers (1)

jiminy_crist
jiminy_crist

Reputation: 2445

Should I convert that for loop to a workflow and put another call to dask inside of b_func? or should I pull this process out and expand the original workflow?

Basically, can I nest dask workflows or is that a bad idea?

In the general case, no, you shouldn't have tasks in dask also call compute. However, you can do this with the distributed scheduler, and things should just work. If you don't specify a scheduler when calling compute in the task, the current scheduler will be used. The downside is that the submitting task ('b_task' in your case) will still be blocked the whole time, which takes up a thread on a worker (less efficient).

In your case I'd instead build the whole graph up beforehand using dask.delayed (http://docs.dask.org/en/latest/delayed.html). This allows you to write loopy normal Python code, and have dask build up the graph for you. See the delayed documentation for more information.

Upvotes: 3

Related Questions