sachinruk
sachinruk

Reputation: 9869

Appending new column to dask dataframe

This is a follow up question to Shuffling data in dask.

I have an existing dask dataframe df where I wish to do the following:

df['rand_index'] = np.random.permutation(len(df))

However, this gives the error, Column assignment doesn't support type ndarray. I tried to use df.assign(rand_index = np.random.permutation(len(df)) which gives the same error.

Here is a minimal (not) working sample:

import pandas as pd
import dask.dataframe as dd
import numpy as np

df = dd.from_pandas(pd.DataFrame({'A':[1,2,3]*10, 'B':[3,2,1]*10}), npartitions=10)
df['rand_index'] = np.random.permutation(len(df))

Note:

The previous question mentioned using df = df.map_partitions(add_random_column_to_pandas_dataframe, ...) but I'm not sure if that is relevant to this particular case.

Edit 1

I attempted df['rand_index'] = dd.from_array(np.random.permutation(len_df)) which, executed without an issue. When I inspected df.head() it seems that the new column was created just fine. However, when I look at df.tail() the rand_index is a bunch of NaNs.

In fact just to confirm I checked df.rand_index.max().compute() which turned out to be smaller than len(df)-1. So this is probably where df.map_partitions comes into play as I suspect this is an issue with dask being partitioned. In my particular case I have 80 partitions (not referring to the sample case).

Upvotes: 8

Views: 12717

Answers (3)

Boyi
Boyi

Reputation: 94

Got the same problem as in Edit 1.

My work around is to get a unique column from the existing dataframe and feed into the dataframe that is to be appended.

import dask.dataframe as dd
import dask.array as da
import numpy as np
import panda as pd

df = dd.from_pandas(pd.DataFrame({'A':[1,2,3]*2, 'B':[3,2,1]*2, 'idx':[0,1,2,3,4,5]}), npartitions=10)
chunks = tuple(df.map_partitions(len).compute())
size = sum(chunks)
permutations = da.from_array(np.random.permutation(len(df)), chunks=chunks)
idx = da.from_array(df['idx'].compute(), chunks=chunks)
ddf = dd.concat([dd.from_dask_array(c) for c in [idx,permutations]], axis = 1)
ddf.columns = ['idx','rand_idx']
df = df.merge(ddf, on='idx')
df = df.set_index('rand_idx')
df.compute().head()

Upvotes: 0

Primer
Primer

Reputation: 10302

You would need to turn np.random.permutation(len(df)) into type that dask understands:

permutations = dd.from_array(np.random.permutation(len(df)))
df['rand_index'] = permutations
df

This would yield:

Dask DataFrame Structure:
                    A      B rand_index
npartitions=10                         
0               int64  int64      int32
3                 ...    ...        ...
...               ...    ...        ...
27                ...    ...        ...
29                ...    ...        ...
Dask Name: assign, 61 tasks

So it is up to you now if you want to .compute() to calculate actual results.

Upvotes: 9

rpanai
rpanai

Reputation: 13437

To assign a column you should use df.assign

Upvotes: 0

Related Questions