Reputation: 1147
I've been working through the Dask Concurrent.futures documentation, and I'm having some trouble with the (outdated) Random Forest example. Specifically, the use of positional indexing to slice the dask dataframe into test/train splits:
train = dfs[:-1]
test = dfs[-1]
And to no avail I've also tried:
train = dfs.loc[:-1]
test = dfs.loc[-1]
which gives me the error:
KeyError Traceback (most recent call last)
/opt/anaconda/lib/python3.5/site-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
2524 try:
-> 2525 return self._engine.get_loc(key)
2526 except KeyError:
pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: 0
During handling of the above exception, another exception occurred:
KeyError Traceback (most recent call last)
<ipython-input-21-fff88783d91d> in <module>()
7 test = dfs.loc[-1]
8
----> 9 estimators = c.map(fit, train)
10 progress(estimators, complete=False)
/opt/anaconda/lib/python3.5/site-packages/distributed/client.py in map(self, func, *iterables, **kwargs)
1243 raise ValueError("Only use allow_other_workers= if using workers=")
1244
-> 1245 iterables = list(zip(*zip(*iterables)))
1246 if isinstance(key, list):
1247 keys = key
/opt/anaconda/lib/python3.5/site-packages/dask/dataframe/core.py in __getitem__(self, key)
2284
2285 # error is raised from pandas
-> 2286 meta = self._meta[_extract_meta(key)]
2287 dsk = dict(((name, i), (operator.getitem, (self._name, i), key))
2288 for i in range(self.npartitions))
/opt/anaconda/lib/python3.5/site-packages/pandas/core/frame.py in __getitem__(self, key)
2137 return self._getitem_multilevel(key)
2138 else:
-> 2139 return self._getitem_column(key)
2140
2141 def _getitem_column(self, key):
/opt/anaconda/lib/python3.5/site-packages/pandas/core/frame.py in _getitem_column(self, key)
2144 # get column
2145 if self.columns.is_unique:
-> 2146 return self._get_item_cache(key)
2147
2148 # duplicate columns & possible reduce dimensionality
/opt/anaconda/lib/python3.5/site-packages/pandas/core/generic.py in _get_item_cache(self, item)
1840 res = cache.get(item)
1841 if res is None:
-> 1842 values = self._data.get(item)
1843 res = self._box_item_values(item, values)
1844 cache[item] = res
/opt/anaconda/lib/python3.5/site-packages/pandas/core/internals.py in get(self, item, fastpath)
3841
3842 if not isna(item):
-> 3843 loc = self.items.get_loc(item)
3844 else:
3845 indexer = np.arange(len(self.items))[isna(self.items)]
/opt/anaconda/lib/python3.5/site-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
2525 return self._engine.get_loc(key)
2526 except KeyError:
-> 2527 return self._engine.get_loc(self._maybe_cast_indexer(key))
2528
2529 indexer = self.get_indexer([key], method=method, tolerance=tolerance)
pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: 0
What is the proper way to use positional indexing in Dask, and what would be the correct way to slice the dataframe into test/train splits in the Random Forest example?
Similar unanswered question: What is the equivalent to iloc for dask dataframe?
EDIT: Creating the original list of futures pointing to Pandas dataframes failed:
from distributed import Executor, s3
e = Executor('52.91.1.177:8786')
dfs = s3.read_csv('dask-data/nyc-taxi/2015',
parse_dates=['tpep_pickup_datetime',
'tpep_dropoff_datetime'],
collection=False)
dfs = e.compute(dfs)
progress(dfs)
throwing the error:
ImportError Traceback (most recent call last)
<ipython-input-3-25aea53688ef> in <module>()
----> 1 from distributed import s3
2
3 dfs = s3.read_csv('dask-data/nyc-taxi/2015',
4 parse_dates=['tpep_pickup_datetime',
5 'tpep_dropoff_datetime'],
ImportError: cannot import name 's3'
Misunderstanding that this was a dask dataframe rather than a list of futures pointing to Pandas dataframes, I tried:
import dask.dataframe as dd
dfs = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv',
parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
storage_options={'anon': True})
dfs = c.persist(dfs)
progress(dfs)
which is what is now causing the indexing problem I mentioned previously. How should I modify this to read from the S3 bucket to a list of futures pointing to Pandas dataframes as described in the blog post?
Upvotes: 4
Views: 1737
Reputation: 57271
I recomend reading documentation, not blogposts. Old blogposts are likely to go stale quickly. Documentation is kept up to date.
In this code in that blogpost dfs
is a list of futures to dataframes, not a dask dataframe.
train = dfs[:-1]
test = dfs[-1]
If you're looking to do a train-test-split then I recommend the random_split method.
Positional indexing is not supported, nor is it likely to be in the near-to-moderate future.
Value-based indexing (.loc
) is supported if you have a well-formed index and divisions, see divisions docs
Upvotes: 2