Riley Hun
Riley Hun

Reputation: 2785

ScikitLearn: FeatureUnion Returning Pandas DataFrame But Also Parallelized

I am building a feature transformation pipeline for the first time, but noted that using FeatureUnion to combine my pipelines together returns a numpy array, but I would prefer to return a pandas dataframe instead so that there's visibility for the feature names.

I have created this class that returns a pandas dataframe for the features, and it works fine, but this custom featureunion class is twice as slow as the built-in sklearn featureunion class. How do I parallelize this so that run-time is on par? Any help would be greatly appreciated!

To introduce multiprocessing, I tried this, but I'm getting an error.

class PandasFeatureUnion(BaseEstimator, TransformerMixin):

def __init__(self, transformer_list, n_jobs=cpu_count()):
    self.transformer_list = transformer_list
    self.n_jobs = min(n_jobs, len(transformer_list))

def one_fit(self, transformer, X, y):
    return transformer.fit(X, y)

def one_transform(self, transformer, X):
    return transformer.transform(X)

def fit(self, X, y=None):
    Parallel(n_jobs=self.n_jobs)(
        delayed(self.one_fit)(trans, X, y)
        for _, trans in self.transformer_list)
    return self

def transform(self, X):
    Xts = Parallel(n_jobs=self.n_jobs)(
        delayed(self.one_transform)(trans, X)
        for name, trans in self.transformer_list)

    Xunion = reduce(lambda X1, X2:
                    pd.merge(X1, X2,
                             left_index=True, right_index=True), Xts)

    return Xunion

The error I'm getting is as follows:

    sklearn.externals.joblib.externals.loky.process_executor._RemoteTraceback: 
    """
    Traceback (most recent call last):
      File "pandas/_libs/index.pyx", line 132, in pandas._libs.index.IndexEngine.get_loc
      File "pandas/_libs/hashtable_class_helper.pxi", line 382, in pandas._libs.hashtable.Float64HashTable.get_item
    TypeError: must be real number, not NoneType

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 2657, in get_loc
    return self._engine.get_loc(key)
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 134, in pandas._libs.index.IndexEngine.get_loc
KeyError: None

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "pandas/_libs/index.pyx", line 132, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 382, in pandas._libs.hashtable.Float64HashTable.get_item
TypeError: must be real number, not NoneType

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/externals/joblib/externals/loky/process_executor.py", line 418, in _process_worker
    r = call_item()
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/externals/joblib/externals/loky/process_executor.py", line 272, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/externals/joblib/_parallel_backends.py", line 567, in __call__
    return self.func(*args, **kwargs)
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py", line 225, in __call__
    for func, args, kwargs in self.items]
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py", line 225, in <listcomp>
    for func, args, kwargs in self.items]
  File "/Users/rihun/PycharmProjects/cross_sell/code/cross_sell_features.py", line 386, in one_transform
    def one_transform(self, transformer, X):
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/pipeline.py", line 451, in _transform
    Xt = transform.transform(Xt)
  File "/Users/rihun/PycharmProjects/cross_sell/code/cross_sell_features.py", line 126, in transform
    X = X.copy()
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/pandas/core/indexing.py", line 190, in __setitem__
    self._setitem_with_indexer(indexer, value)
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/pandas/core/indexing.py", line 380, in _setitem_with_indexer
    nindexer.append(labels.get_loc(key))
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/pandas/core/indexes/numeric.py", line 436, in get_loc
    tolerance=tolerance)
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 2659, in get_loc
    return self._engine.get_loc(self._maybe_cast_indexer(key))
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 134, in pandas._libs.index.IndexEngine.get_loc
KeyError: None
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/rihun/PycharmProjects/cross_sell/code/savm_features_pipeline.py", line 75, in <module>
    data_tr = full_pipeline.fit_transform(data)
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/pipeline.py", line 300, in fit_transform
    return last_step.fit_transform(Xt, y, **fit_params)
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/base.py", line 464, in fit_transform
    return self.fit(X, **fit_params).transform(X)
  File "/Users/rihun/PycharmProjects/cross_sell/code/cross_sell_features.py", line 397, in transform
    for _, trans in self.transformer_list)
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py", line 930, in __call__
    self.retrieve()
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py", line 833, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/site-packages/sklearn/externals/joblib/_parallel_backends.py", line 521, in wrap_future_result
    return future.result(timeout=timeout)
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/Users/rihun/anaconda3/envs/CrossSell/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
KeyError: None

Upvotes: 0

Views: 396

Answers (1)

Pierluigi
Pierluigi

Reputation: 1118

I rebuild your code here: colab notebook and it looks like working to me... You may have issues with multiple transformers due to the internal state of each transformer and due to the fact that you should apply them in sequence to guarantee consistency of data... it looks like you are someway replicating sklearn.Pipeline ...

It could be useful, if you can provide reproducible bug example...

You may want to take a look also to Pipesnake I do not like to do self-reference but you can find some inspiration in this code

Upvotes: 1

Related Questions