MetaStack
MetaStack

Reputation: 3716

why doesn't dask parallelize this workflow?

I have 2 very simple functions:

import time

def sleepy(a=1):
    time.sleep(a)
    print(a)

def ending(*args):
    print(args)
    print('finished')

I also have a dask workflow that uses these functions:

workflow = {'task_0': (sleepy, 1), 
            'task_1': (sleepy, 2), 
            'task_2': (sleepy, 3), 
            'ending': (ending, 'task_0', 'task_1', 'task_2')}

This workflow can be visualized like this:

dask.visualize(workflow)

sleepy, sleepy, sleepy, are supposed to be run in parallel, but they aren't.

I wait for 1 second and it prints 1 from sleepy(), then I wait 2 seconds and it prints 2, then I wait 3 more seconds and it prints 3:

1
2
3
(None, None, None)
finished

What am I doing wrong?

Upvotes: 1

Views: 77

Answers (2)

MetaStack
MetaStack

Reputation: 3716

Changing the dask.get( to dask.threaded.get( fixed my problem but I also really liked mdurant's answer.

Upvotes: 0

mdurant
mdurant

Reputation: 28684

This is how I would code your workflow, and the sleep operations do indeed occur in parallel

import dask.delayed
import time

@dask.delayed
def sleepy(a=1):
    time.sleep(a)
    print(a)

@dask.delayed
def ending(*args):
    print(args)
    print('finished')

d = ending(*[sleepy(i) for i in [1, 2, 3]])
d.compute()

Note that the @ decorator is only syntactic niceness, you can also do dask.delayed(sleepy), etc.

Upvotes: 2

Related Questions