Reputation: 967
I am trying to parallelize a nested loop using dask distribute that looks this way:
@dask.delayed
def delayed_a(e):
a = do_something_with(e)
return something
@dask.delayed
def delayed_b(element):
computations = []
for e in element:
computations.add(delayed_a(e))
b = dask.compute(*computations, scheduler='distributed',
num_workers=4)
return b
list = [some thousands of elements here]
computations = []
for element in list:
computations.append(delayed_b(element))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
As you see, I am using the distributed
scheduler. First, I create a computations
list with a lazy delayed_b
function that takes as an argument one element from list
. Then, delayed_b
creates a new set of computations
that are calling a delayed_a
function and everything is executed in distributed. This pseudo code is working but I have found out that is faster if delayed_a
is not there. Then my question is -- what would be the correct way of doing distributed parallel for loops?
At the end of the history what I am trying to do is:
list = [some thousands of elements here]
for element in list:
for e in element:
do_something_with(e)
I would really appreciate any suggestions about the best way of accomplishing doing nested loops with dask.distributed
.
Upvotes: 0
Views: 2905
Reputation: 28673
Simple:
something = dask.delayed(do_something_with_e
list = [some thousands of elements here]
# this could be written as a one-line comprehension
computations = []
for element in list:
part = []
computations.append(part)
for e in element:
part.append(something(e))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
You should never be calling a delayed function or compute()
within a delayed function.
(note that the distributed scheduler would be used by default, so long as you've created a Client)
Upvotes: 1