wl2776
wl2776

Reputation: 4327

How to use Future with map method of the Executor from dask.distrubuted (Python library)?

I am running dask.distributed cluster.

My task includes chained computations, where the last step is a parallel processing of a list, created on previous steps, using Executor.map method. The length of the list is not known in advance, because it is generated from intermediate results during computation.

The code looks like the following:

from distributed import Executor, progress


def process():
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
                                           port=config('SERVER_PORT')))
    futures = []
    gen_list1 = get_list_1()
    gen_f1 = e.map(generate_1, gen_list1)
    futures.append(gen_f1)

    gen_list2 = get_list_2()
    gen_f2 = e.map(generate_2, gen_list2)
    futures.append(gen_f2)

    m_list = e.submit(create_m_list)  # m_list is created from gen_list1 and gen_list2
                                      # some results of processing are stored in the database
                                      # and create_m_list doesn't need additional arguments
    futures.append(m_list)

    m_result = e.map(process_m_list, m_list)
    futures.append(m_result)

    return futures

if __name__ == '__main__':
    r = process()
    progress(r)

However, I'm getting the error TypeError: zip argument #1 must support iteration:

File "F:/wl/under_development/database/jobs.py", line 366, in start_job
  match_result = e.map(process_m_list, m_list)
File "C:\Anaconda\lib\site-packages\distributed\executor.py", line 672, in map
  iterables = list(zip(*zip(*iterables)))
TypeError: zip argument #1 must support iteration

gen_list1 and gen_list2 are computed independently, but m_list is created from gen_list1 and gen_list2 and therefore depends on them.

I've also tried calling .result() method of m_list, however, it has blocked the function process until computations of gen_list1 and gen_list2 have finished.

I've also tried calling asynchronous method ._result of m_list, but it has produced the same error "zip argument #1 must support iteration". Same error has been obtained with dask.delayed (m_result = e.map(process_m_list, delayed(m_list))).

Documentation of the dask.distributed is vague in this aspect, examples mention only real list objects that already exist. However, other posts here in SO, as well as Google, suggest that it should be possible.

Here is the version string of my Python distribution

Python 2.7.11 |Anaconda custom (64-bit)| (default, Feb 16 2016, 09:58:36) [MSC v.1500 64 bit (AMD64)] on win32

Upvotes: 4

Views: 464

Answers (1)

MRocklin
MRocklin

Reputation: 57281

The crux of your problem seems to be here:

m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list)

You are correct that you can not map a function over an individual future. You need to pass map a sequence. Dask doesn't know how many functions to submit without knowing more about your data. Calling .result() on the future would be a fine solution:

m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list.result())

I've also tried calling .result() method of m_list, however, it has blocked the function process until computations of gen_list1 and gen_list2 have finished.

That's correct. Without any additional information the scheduler will prefer computations that were submitted earlier. You could resolve this problem by submitting your create_m_list function first, then submitting your extra comptuations, then waiting on the create_m_list result.

m_list = e.submit(create_m_list)                   # give this highest priority
f1 = e.map(generate_1, get_list_1())
f2 = e.map(generate_2, gen_list_2())

L = m_list.result()                                # block on m_list until done
m_result = e.map(process_m_list, L)                # submit more tasks

return [f1, f2, m_result]

Upvotes: 1

Related Questions