Reputation: 770
I have a custom file format that I want to lazily load and process and it would be useful if it were in a form of dataframe.
I already have a function that create panda's dataframes but it is costly to run.
dask.dataframe.from_delayed
logically merges all the data frames as if they were one dataframe given an iterable. when invoked, from_delayed iterates over the iterable (generator in example code) and creates delayed(pd.dataframe)'s.
My issue is that reading the dataset is required in order to generate the dataframes but I want at all cost to read the file(iterate over the generator) as late as possible.
def gen():
for i in range(10):
sleep(10) #Looping this generator is costly
yield delayed(pd.DataFrame([[i,i+1]]))
ddf = dask.dataframe.from_delayed(gen())
#instruct delayed calculations
ddf.compute()
Is it possible with dask? Am I solving the correct problem?
Upvotes: 0
Views: 232
Reputation: 770
As of end of may 2022 I am still struggling with this. Despite Sultan's solution which works for evaluating the generators it doesn't parallelize afterwards. I have posted another related question: dask bag embarrassingly parallel but with a generator regarding the parallelization issue.
The issue I was facing was related to reading compressed files (larger than memory uncompressed) with dask.
dask does not parallelize reading when compressed thus requiring some form of manual computation or wrapper class on dask's constructs.
My current solution is essentially "recompressing" the files into a database instead of using them raw. This allows meta data, indexing and querying over the "compressed" data in parallel rather file by file.
Upvotes: 0
Reputation: 16581
At the time of creation of the dask.dataframe, dask needs to know the columns and dtypes, so it will try to evaluate the delayed. If you want to avoid this behaviour, then the best case is to explicitly state what the delayed data will evaluate to and provide as meta
kwarg to from_delayed
:
meta = pd.DataFrame([0, 1])
ddf = dask.dataframe.from_delayed(gen(), meta=meta)
Note the snippet you provided will still have a 10 second delay for each delayed computation because by design your generator yields the delayed objects with a 10 second delay. If possible, you want to refactor the code to have the delayed decorator earlier (so the list of delayed computations is formed faster). Something like this:
from time import sleep
import dask.dataframe
import pandas as pd
from dask import delayed
def gen():
for i in range(2):
yield costly_read(i)
@delayed
def costly_read(i):
sleep(10)
return pd.DataFrame([[i, i + 1]])
meta = pd.DataFrame([0, 1])
ddf = dask.dataframe.from_delayed(gen(), meta=meta)
Upvotes: 1