Reputation: 187
I want to speed up a task on a Dataframe that, for each row, saves an image to a local folder. So, it doesn't return anything. I tried to run that function using Dask, but Dask seems to require that the function return something; I cannot make .apply work...
Is there any other way to make this work?
import dask.dataframe as dd
import pandas as pd
doc = pd.DataFrame({'file_name': ['Bob', 'Jane', 'Alice','Allan'],
'text': ['text1','text2', 'text3','text4']})
def func(row):
with open(row['file_name']+'.txt', 'w') as f:
f.write(row['text'])
ddf = dd.from_pandas(doc, npartitions=2)
k = ddf.apply(func,axis=1,meta=(None,'object'))
k.compute()
The only reason meta is (None,'object') is because that's what Dask itself suggested when I ran similar code without a meta argument.
This doesn't produce any errors, and it correctly runs.. I am now not able to reproduce my own mistake since I corrected my original mistake yesterday with Michael Delgados answer..
Upvotes: 1
Views: 548
Reputation: 15432
One way to do this would be to make use of the function dask.dataframe.DataFrame.map_partitions
. This calls the passed function on each partition, each of which is itself a pandas.DataFrame
. Within that, you can apply a function if you'd like.
For example, the following defines a dataframe with the columns i
and n
, then maps a function across each row which writes a file based on the row's values:
import os
import tempfile
def do_something_with_side_effects(row, dirname):
fp = os.path.join(dirname, f"{row.i}_{row.n}.txt")
with open(fp, 'w+') as f:
f.write("file contents!")
def do_something_in_partitions(df, dirname):
df.apply(do_something_with_side_effects, axis=1, dirname=dirname)
df = dask.dataframe.from_pandas(
pd.DataFrame({'i': ['A'] * 10 + ['B'] * 5, 'n': np.arange(15)}),
chunksize=5,
)
The trick with getting dask.dataframe.DataFrame.map_partitions
right is providing the meta
argument, which is needed for dask to understand how to schedule the operation. If you're not returning anything, you can simply provide an empty dictionary:
In [18]: tempdir = str(tempfile.TemporaryDirectory())
...: os.makedirs(tempdir, exist_ok=True)
...: f = df.map_partitions(do_something_in_partitions, meta={}, dirname=tempdir)
...: f.compute()
...: os.listdir(tempdir)
Out[18]:
['A_7.txt',
'A_6.txt',
'A_4.txt',
'A_5.txt',
'A_1.txt',
'A_0.txt',
'A_2.txt',
'A_3.txt',
'B_12.txt',
'B_13.txt',
'B_11.txt',
'B_10.txt',
'A_8.txt',
'B_14.txt',
'A_9.txt']
Upvotes: 1