learner
learner

Reputation: 877

Running df.apply, dask and pd.get_dummies together

I have multiple categorical columns with millions of distinct values in these categorical columns. So, I am using dask and pd.get_dummies for converting these categorical columns into bit vectors. Like this:

import pandas as pd
import numpy as np
import scipy.sparse
import dask.dataframe as dd
import multiprocessing

train_set = pd.read_csv('train_set.csv')

def convert_into_one_hot (col1, col2):
     return pd.get_dummies(train_set, columns=[col1, col2], sparse=True)

ddata = dd.from_pandas(train_set, npartitions=2*multiprocessing.cpu_count()).map_partitions(lambda df: df.apply((lambda row: convert_into_one_hot(row.col1, row.col2)), axis=1)).compute(scheduler='processes')

But, I get this error:

ValueError: Metadata inference failed in `lambda`.

You have supplied a custom function and Dask is unable to determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
KeyError("None of [Index(['foo'], dtype='object')] are in the [columns]")

What am I doing wrong here? Thanks.

EDIT:

A small example to reproduce the error. Hope it helps to understand the problem.

def convert_into_one_hot (x, y):
    return pd.get_dummies(df, columns=[x, y], sparse=True)
d = {'col1': ['a', 'b'], 'col2': ['c', 'd']}
df = pd.DataFrame(data=d)
dd.from_pandas(df, npartitions=2*multiprocessing.cpu_count()).map_partitions(lambda df: df.apply((lambda row: convert_into_one_hot(row.col1, row.col2)), axis=1)).compute(scheduler='processes')

Upvotes: 3

Views: 266

Answers (1)

rpanai
rpanai

Reputation: 13447

I think you could have some problems if you try to use get_dummies within partitions. there is a dask version for this and should work as following

import pandas as pd
import dask.dataframe as dd
import multiprocessing as mp

d = {'col1': ['a', 'b'], 'col2': ['c', 'd']}
df = pd.DataFrame(data=d)

Pandas

pd.get_dummies(df, columns=["col1", "col2"], sparse=True)

Dask

ddf = dd.from_pandas(df, npartitions=2 * mp.cpu_count())

# you need to converts columns dtypes to category
dummies_cols = ["col1", "col2"]
ddf[dummies_cols] = ddf[dummies_cols].categorize()

dd.get_dummies(ddf, columns=["col1", "col2"], sparse=True)

Upvotes: 4

Related Questions