Vladimir
Vladimir

Reputation: 145

Simplest way complex dask graph creation

There is a complex system of calculations over some objects. The difficulty is that some calculations are group calculations.

This can demonstrate by the following example:

from dask distributed import client

def load_data_from_db(id):
    # load some data
    ...
    return data

def task_a(data):
    # some calculations
    ...
    return result

def group_task(*args):
    # some calculations
    ...
    return result

def task_b(data, group_data):
    # some calculations
    ...
    return result

def task_c(data, task_a_result)
    # some calculations
    ...
    return result

ids = [1, 2]
dsk = {'id_{}'.format(i): id for i, id in enumerate(ids)}

dsk['data_0'] = (load_data_from_db, 'id_0')
dsk['data_1'] = (load_data_from_db, 'id_1')

dsk['task_a_result_0'] = (task_a, 'data_0')
dsk['task_a_result_1'] = (task_a, 'data_1')

dsk['group_result'] = (
    group_task,
    'data_0', 'task_a_result_0',
    'data_1', 'task_a_result_1')

dsk['task_b_result_0'] = (task_b, 'data_0', 'group_result')
dsk['task_b_result_1'] = (task_b, 'data_1', 'group_result')

dsk['task_c_result_0'] = (task_c, 'data_0', 'task_a_result_0')
dsk['task_c_result_1'] = (task_c, 'data_1', 'task_a_result_1')

client = Client(scheduler_address)
result = client.get(
    dsk,
    ['task_a_result_0',
     'task_b_result_0',
     'task_c_result_0',
     'task_a_result_1',
     'task_b_result_1',
     'task_c_result_1'])

The list of objects is counted is thousands elements, and the number of tasks is dozens (including several group tasks).

With such method of graph creation it is difficult to modify the graph (add new tasks, change dependencies, etc.). Is there a more efficient way of distributed computing using dask for these context?

Added

With futures graph is:

client = Client(scheduler_address)

ids = [1, 2]
data = client.map(load_data_from_db, ids)

result_a = client.map(task_a, data)

group_args = list(chain(*zip(data, result_a)))
result_group = client.submit(task_group, *group_args)

result_b = client.map(task_b, data, [result_group] * len(ids))

result_c = client.map(task_c, data, result_a)

result = client.gather(result_a + result_b + result_c)

And in task functions input arguments is Future instance then arg.result() before use.

Upvotes: 0

Views: 77

Answers (1)

MRocklin
MRocklin

Reputation: 57281

If you want to modify the computation during computation then I recommend the futures interface.

Upvotes: 1

Related Questions