Reputation: 338
I created a custom DAG dictionary according to https://docs.dask.org/en/latest/custom-graphs.html which can take 1 input file. Now, I want to feed it a list of files but without repeating the items in the DAG as shown in the above link. I'm aware of the delayed
functionality at: https://docs.dask.org/en/latest/delayed.html, but I need to use get
to evaluate the DAG right? If so, how can I use it with delayed
or futures
?
Upvotes: 0
Views: 347
Reputation: 358
Dask graphs contain a key/value pair for each task.
Without knowing the precise context of your question, you could have a function build a dask graph for each input file. Its execution could then be outsourced to its own function, which you could further parallelise using the dask.delayed
framework. Here's a code example:
import dask
def produce_dask_graph_for_filename(filename):
graph = dict()
# this task reads the data from the file
graph['data'] = (lambda x: x, filename)
# this task processes the data
graph['result'] = (lambda x: x[-1], 'data')
return graph
def compute_result(filename):
# create the filename specific graph
graph = produce_dask_graph_for_filename(filename)
# perform the computation
result = dask.get(graph, 'result')
return result
filename_list = ['file_%s' %i for i in range(5)]
futures = [dask.delayed(compute_result)(filename) for filename in filename_list]
results = dask.compute(futures)[0]
Output gives:
['0', '1', '2', '3', '4']
You can then visualise the individual graphs using the following line (make sure to have graphviz and python-graphviz installed)
dask.visualize(produce_dask_graph_for_filename(filename))
Upvotes: 1