Reputation: 123
I was trying to concat DaskDataFrame
from read_parquet
, then apply query filter, then sample it to cap the final dataframe size to less than or equal to 10000. Here's the pseudo code:
import dask.dataframe as dd
df = dd.concat([ dd.read_parquet(path, index='date').query("(col0 < 4) & (date < '20170201')")
for path in files ],
interleave_partitions=True)
df = df.sample(float(10000) / max(10000, len(df)))
df = df.compute()
However, it failed with:
ValueError: a must be greater than 0
Traceback
---------
File "/opt/anaconda2/lib/python2.7/site-packages/dask/async.py", line 266, in execute_task
result = _execute_task(task, data)
File "/opt/anaconda2/lib/python2.7/site-packages/dask/async.py", line 247, in _execute_task
return func(*args2)
File "/opt/anaconda2/lib/python2.7/site-packages/dask/dataframe/methods.py", line 143, in sample
return df.sample(random_state=rs, frac=frac, replace=replace)
File "/opt/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 2644, in sample
locs = rs.choice(axis_length, size=n, replace=replace, p=weights)
File "mtrand.pyx", line 1391, in mtrand.RandomState.choice (numpy/random/mtrand/mtrand.c:16430)
If I don't do the .query(...)
part, then it works fine. If I apply the query after the sample, it's also OK, but then I cannot control the final DataFrame
size. Is there anything wrong with what I try to do here?
I'm running OS X 10.10.5, fastparquet 0.0.5, dask 0.14.1, python 2.7.12.
Upvotes: 2
Views: 10804
Reputation: 123
The "ValueError: a must be greater than 0" error is thrown because some of the pandas DataFrame
are empty. This ValueError is thrown by pandas.DataFrame.sample
method. Because we do sample after the dask query, and not all of the subtask of query will produce a non-empty pandas.DataFrame
, this ValueError is almost guaranteed to happen.
The proper fix should be in dask.dataframe
code: return df
itself if it's empty, otherwise call df.sample
:
> /opt/anaconda2/lib/python2.7/site-packages/dask/dataframe/methods.py(166)sample()
164 def sample(df, state, frac, replace):
165 rs = np.random.RandomState(state)
--> 166 return df.sample(random_state=rs, frac=frac, replace=replace)
i.e. return df.sample(random_state=rs, frac=frac, replace=replace) \
if len(df) > 0 else df
Upvotes: 4