Ilya
Ilya

Reputation: 31

Computing multiple dask.dataframe.from_delayed() from one source

How can I compute .from_delayed() in parallel from one sequence of delayed?

def foo():
    df1, df2 = ...  # prepare two pd.DataFrame() in one foo() call
    return df1, df2

dds = [dask.delayed(foo)() for _ in range(5)]  # 5 delayed pairs (df1, df2)...
df1 = dd.from_delayed([d[0] for d in dds], meta=...)
df2 = dd.from_delayed([d[1] for d in dds], meta=...)
client.compute([
    df1.to_parquet(file1, write_index=True, engine='fastparquet', compute=False),
    df2.to_parquet(file2, write_index=True, engine='fastparquet', compute=False)
], sync=True)

Here foo() will be called 10 times. Is it possible to create graph so it will be called only 5 times?

Thanks

Upvotes: 3

Views: 614

Answers (1)

MRocklin
MRocklin

Reputation: 57311

Thank you for the clear example. In principle you're correct that foo should only be called five times. My guess is that optimizations are misbehaving here. Short term I recommend trying the following from a recent release:

dask.config.set({"optimization.fuse.active": False})

... your code follows

Upvotes: 1

Related Questions