shellcat_zero
shellcat_zero

Reputation: 1147

How to perform positional indexing in Python Dask dataframes

I've been working through the Dask Concurrent.futures documentation, and I'm having some trouble with the (outdated) Random Forest example. Specifically, the use of positional indexing to slice the dask dataframe into test/train splits:

train = dfs[:-1]
test = dfs[-1]

And to no avail I've also tried:

train = dfs.loc[:-1]
test = dfs.loc[-1]

which gives me the error:

KeyError                                  Traceback (most recent call last)
/opt/anaconda/lib/python3.5/site-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
   2524             try:
-> 2525                 return self._engine.get_loc(key)
   2526             except KeyError:

pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

KeyError: 0

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
<ipython-input-21-fff88783d91d> in <module>()
      7 test = dfs.loc[-1]
      8 
----> 9 estimators = c.map(fit, train)
     10 progress(estimators, complete=False)

/opt/anaconda/lib/python3.5/site-packages/distributed/client.py in map(self, func, *iterables, **kwargs)
   1243             raise ValueError("Only use allow_other_workers= if using workers=")
   1244 
-> 1245         iterables = list(zip(*zip(*iterables)))
   1246         if isinstance(key, list):
   1247             keys = key

/opt/anaconda/lib/python3.5/site-packages/dask/dataframe/core.py in __getitem__(self, key)
   2284 
   2285             # error is raised from pandas
-> 2286             meta = self._meta[_extract_meta(key)]
   2287             dsk = dict(((name, i), (operator.getitem, (self._name, i), key))
   2288                        for i in range(self.npartitions))

/opt/anaconda/lib/python3.5/site-packages/pandas/core/frame.py in __getitem__(self, key)
   2137             return self._getitem_multilevel(key)
   2138         else:
-> 2139             return self._getitem_column(key)
   2140 
   2141     def _getitem_column(self, key):

/opt/anaconda/lib/python3.5/site-packages/pandas/core/frame.py in _getitem_column(self, key)
   2144         # get column
   2145         if self.columns.is_unique:
-> 2146             return self._get_item_cache(key)
   2147 
   2148         # duplicate columns & possible reduce dimensionality

/opt/anaconda/lib/python3.5/site-packages/pandas/core/generic.py in _get_item_cache(self, item)
   1840         res = cache.get(item)
   1841         if res is None:
-> 1842             values = self._data.get(item)
   1843             res = self._box_item_values(item, values)
   1844             cache[item] = res

/opt/anaconda/lib/python3.5/site-packages/pandas/core/internals.py in get(self, item, fastpath)
   3841 
   3842             if not isna(item):
-> 3843                 loc = self.items.get_loc(item)
   3844             else:
   3845                 indexer = np.arange(len(self.items))[isna(self.items)]

/opt/anaconda/lib/python3.5/site-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
   2525                 return self._engine.get_loc(key)
   2526             except KeyError:
-> 2527                 return self._engine.get_loc(self._maybe_cast_indexer(key))
   2528 
   2529         indexer = self.get_indexer([key], method=method, tolerance=tolerance)

pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()

KeyError: 0

What is the proper way to use positional indexing in Dask, and what would be the correct way to slice the dataframe into test/train splits in the Random Forest example?

Similar unanswered question: What is the equivalent to iloc for dask dataframe?

EDIT: Creating the original list of futures pointing to Pandas dataframes failed:

from distributed import Executor, s3
e = Executor('52.91.1.177:8786')

dfs = s3.read_csv('dask-data/nyc-taxi/2015',
                  parse_dates=['tpep_pickup_datetime',
                               'tpep_dropoff_datetime'],
                  collection=False)
dfs = e.compute(dfs)
progress(dfs)

throwing the error:

ImportError                               Traceback (most recent call last)
<ipython-input-3-25aea53688ef> in <module>()
----> 1 from distributed import s3
      2 
      3 dfs = s3.read_csv('dask-data/nyc-taxi/2015', 
      4                   parse_dates=['tpep_pickup_datetime', 
      5                                'tpep_dropoff_datetime'],

ImportError: cannot import name 's3'

Misunderstanding that this was a dask dataframe rather than a list of futures pointing to Pandas dataframes, I tried:

import dask.dataframe as dd

dfs = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 storage_options={'anon': True})
dfs = c.persist(dfs)
progress(dfs)

which is what is now causing the indexing problem I mentioned previously. How should I modify this to read from the S3 bucket to a list of futures pointing to Pandas dataframes as described in the blog post?

Upvotes: 4

Views: 1737

Answers (1)

MRocklin
MRocklin

Reputation: 57271

I recomend reading documentation, not blogposts. Old blogposts are likely to go stale quickly. Documentation is kept up to date.

In this code in that blogpost dfs is a list of futures to dataframes, not a dask dataframe.

train = dfs[:-1]
test = dfs[-1]

If you're looking to do a train-test-split then I recommend the random_split method.

Positional indexing is not supported, nor is it likely to be in the near-to-moderate future.

Value-based indexing (.loc) is supported if you have a well-formed index and divisions, see divisions docs

Upvotes: 2

Related Questions