Avatrin
Avatrin

Reputation: 187

Parallelizing a task with Dask that for each row in a Dataframe returns None

I want to speed up a task on a Dataframe that, for each row, saves an image to a local folder. So, it doesn't return anything. I tried to run that function using Dask, but Dask seems to require that the function return something; I cannot make .apply work...

Is there any other way to make this work?

Update: Minimally reproducible example

import dask.dataframe as dd
import pandas as pd

doc = pd.DataFrame({'file_name': ['Bob', 'Jane', 'Alice','Allan'], 
               'text': ['text1','text2', 'text3','text4']})

def func(row):
    with open(row['file_name']+'.txt', 'w') as f:
        f.write(row['text'])

ddf = dd.from_pandas(doc, npartitions=2)
k = ddf.apply(func,axis=1,meta=(None,'object'))
k.compute()

The only reason meta is (None,'object') is because that's what Dask itself suggested when I ran similar code without a meta argument.

This doesn't produce any errors, and it correctly runs.. I am now not able to reproduce my own mistake since I corrected my original mistake yesterday with Michael Delgados answer..

Upvotes: 1

Views: 548

Answers (1)

Michael Delgado
Michael Delgado

Reputation: 15432

One way to do this would be to make use of the function dask.dataframe.DataFrame.map_partitions. This calls the passed function on each partition, each of which is itself a pandas.DataFrame. Within that, you can apply a function if you'd like.

For example, the following defines a dataframe with the columns i and n, then maps a function across each row which writes a file based on the row's values:

import os
import tempfile

def do_something_with_side_effects(row, dirname):
    fp = os.path.join(dirname, f"{row.i}_{row.n}.txt")
    with open(fp, 'w+') as f:
        f.write("file contents!")

def do_something_in_partitions(df, dirname):
    df.apply(do_something_with_side_effects, axis=1, dirname=dirname)

df = dask.dataframe.from_pandas(
    pd.DataFrame({'i': ['A'] * 10 + ['B'] * 5, 'n': np.arange(15)}),
    chunksize=5,
)

The trick with getting dask.dataframe.DataFrame.map_partitions right is providing the meta argument, which is needed for dask to understand how to schedule the operation. If you're not returning anything, you can simply provide an empty dictionary:


In [18]: tempdir = str(tempfile.TemporaryDirectory())
    ...: os.makedirs(tempdir, exist_ok=True)
    ...: f = df.map_partitions(do_something_in_partitions, meta={}, dirname=tempdir)
    ...: f.compute()
    ...: os.listdir(tempdir)
Out[18]:
['A_7.txt',
 'A_6.txt',
 'A_4.txt',
 'A_5.txt',
 'A_1.txt',
 'A_0.txt',
 'A_2.txt',
 'A_3.txt',
 'B_12.txt',
 'B_13.txt',
 'B_11.txt',
 'B_10.txt',
 'A_8.txt',
 'B_14.txt',
 'A_9.txt']

Upvotes: 1

Related Questions