Reputation: 4327
I am running dask.distributed cluster.
My task includes chained computations, where the last step is a parallel processing of a list, created on previous steps, using Executor.map
method. The length of the list is not known in advance, because it is generated from intermediate results during computation.
The code looks like the following:
from distributed import Executor, progress
def process():
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
gen_list1 = get_list_1()
gen_f1 = e.map(generate_1, gen_list1)
futures.append(gen_f1)
gen_list2 = get_list_2()
gen_f2 = e.map(generate_2, gen_list2)
futures.append(gen_f2)
m_list = e.submit(create_m_list) # m_list is created from gen_list1 and gen_list2
# some results of processing are stored in the database
# and create_m_list doesn't need additional arguments
futures.append(m_list)
m_result = e.map(process_m_list, m_list)
futures.append(m_result)
return futures
if __name__ == '__main__':
r = process()
progress(r)
However, I'm getting the error TypeError: zip argument #1 must support iteration
:
File "F:/wl/under_development/database/jobs.py", line 366, in start_job
match_result = e.map(process_m_list, m_list)
File "C:\Anaconda\lib\site-packages\distributed\executor.py", line 672, in map
iterables = list(zip(*zip(*iterables)))
TypeError: zip argument #1 must support iteration
gen_list1
and gen_list2
are computed independently, but m_list
is created from gen_list1
and gen_list2
and therefore depends on them.
I've also tried calling .result()
method of m_list
, however, it has blocked the function process
until computations of gen_list1
and gen_list2
have finished.
I've also tried calling asynchronous method ._result
of m_list
, but it has produced the same error "zip argument #1 must support iteration". Same error has been obtained with dask.delayed
(m_result = e.map(process_m_list, delayed(m_list))
).
Documentation of the dask.distributed
is vague in this aspect, examples mention only real list objects that already exist. However, other posts here in SO, as well as Google, suggest that it should be possible.
Here is the version string of my Python distribution
Python 2.7.11 |Anaconda custom (64-bit)| (default, Feb 16 2016, 09:58:36) [MSC v.1500 64 bit (AMD64)] on win32
Upvotes: 4
Views: 464
Reputation: 57281
The crux of your problem seems to be here:
m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list)
You are correct that you can not map a function over an individual future. You need to pass map
a sequence. Dask doesn't know how many functions to submit without knowing more about your data. Calling .result()
on the future would be a fine solution:
m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list.result())
I've also tried calling .result() method of m_list, however, it has blocked the function process until computations of gen_list1 and gen_list2 have finished.
That's correct. Without any additional information the scheduler will prefer computations that were submitted earlier. You could resolve this problem by submitting your create_m_list
function first, then submitting your extra comptuations, then waiting on the create_m_list
result.
m_list = e.submit(create_m_list) # give this highest priority
f1 = e.map(generate_1, get_list_1())
f2 = e.map(generate_2, gen_list_2())
L = m_list.result() # block on m_list until done
m_result = e.map(process_m_list, L) # submit more tasks
return [f1, f2, m_result]
Upvotes: 1