user1527390
user1527390

Reputation: 123

dask DataFrame query then sample error

I was trying to concat DaskDataFrame from read_parquet, then apply query filter, then sample it to cap the final dataframe size to less than or equal to 10000. Here's the pseudo code:

import dask.dataframe as dd

df = dd.concat([ dd.read_parquet(path, index='date').query("(col0 < 4) & (date < '20170201')")  
                 for path in files ], 
               interleave_partitions=True)
df = df.sample(float(10000) / max(10000, len(df)))
df = df.compute()

However, it failed with:

ValueError: a must be greater than 0

Traceback
---------
  File "/opt/anaconda2/lib/python2.7/site-packages/dask/async.py", line 266, in execute_task
    result = _execute_task(task, data)  
  File "/opt/anaconda2/lib/python2.7/site-packages/dask/async.py", line 247, in _execute_task
    return func(*args2)  
  File "/opt/anaconda2/lib/python2.7/site-packages/dask/dataframe/methods.py", line 143, in sample
    return df.sample(random_state=rs, frac=frac, replace=replace)  
  File "/opt/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 2644, in sample
    locs = rs.choice(axis_length, size=n, replace=replace, p=weights)  
  File "mtrand.pyx", line 1391, in mtrand.RandomState.choice (numpy/random/mtrand/mtrand.c:16430)  

If I don't do the .query(...) part, then it works fine. If I apply the query after the sample, it's also OK, but then I cannot control the final DataFrame size. Is there anything wrong with what I try to do here?

I'm running OS X 10.10.5, fastparquet 0.0.5, dask 0.14.1, python 2.7.12.

Upvotes: 2

Views: 10804

Answers (1)

user1527390
user1527390

Reputation: 123

The "ValueError: a must be greater than 0" error is thrown because some of the pandas DataFrame are empty. This ValueError is thrown by pandas.DataFrame.sample method. Because we do sample after the dask query, and not all of the subtask of query will produce a non-empty pandas.DataFrame, this ValueError is almost guaranteed to happen.

The proper fix should be in dask.dataframe code: return df itself if it's empty, otherwise call df.sample:

> /opt/anaconda2/lib/python2.7/site-packages/dask/dataframe/methods.py(166)sample()
164 def sample(df, state, frac, replace):
165     rs = np.random.RandomState(state)
--> 166     return df.sample(random_state=rs, frac=frac, replace=replace)

i.e. return df.sample(random_state=rs, frac=frac, replace=replace) \
            if len(df) > 0 else df

Upvotes: 4

Related Questions