muammar
muammar

Reputation: 967

How to parallelize a nested loop with dask.distributed?

I am trying to parallelize a nested loop using dask distribute that looks this way:

@dask.delayed
def delayed_a(e):
    a = do_something_with(e)
    return something

@dask.delayed
def delayed_b(element):
    computations = []
    for e in element:
        computations.add(delayed_a(e))

    b = dask.compute(*computations, scheduler='distributed',
                    num_workers=4)
    return b

list = [some thousands of elements here]
computations = []
for element in list:
    computations.append(delayed_b(element))
    results = dask.compute(*computations, scheduler='distributed',
                           num_workers=4)

As you see, I am using the distributed scheduler. First, I create a computations list with a lazy delayed_b function that takes as an argument one element from list. Then, delayed_b creates a new set of computations that are calling a delayed_a function and everything is executed in distributed. This pseudo code is working but I have found out that is faster if delayed_a is not there. Then my question is -- what would be the correct way of doing distributed parallel for loops?

At the end of the history what I am trying to do is:

list = [some thousands of elements here]
for element in list:
    for e in element:
        do_something_with(e)

I would really appreciate any suggestions about the best way of accomplishing doing nested loops with dask.distributed.

Upvotes: 0

Views: 2905

Answers (1)

mdurant
mdurant

Reputation: 28673

Simple:

something = dask.delayed(do_something_with_e
list = [some thousands of elements here]

# this could be written as a one-line comprehension
computations = []
for element in list:
    part = []
    computations.append(part)
    for e in element:
        part.append(something(e))

results = dask.compute(*computations, scheduler='distributed',
                       num_workers=4)

You should never be calling a delayed function or compute() within a delayed function.

(note that the distributed scheduler would be used by default, so long as you've created a Client)

Upvotes: 1

Related Questions