Reputation: 1919
I want to create my own transformer for use with the sklearn Pipeline
.
I am creating a class that implements both fit and transform methods. The purpose of the transformer will be to remove rows from the matrix that have more than a specified number of NaNs.
The issue I am facing is how can I change both the X and y matrices that are passed to the transformer?
I believe this has to be done in the fit method since it has access to both X and y. Since python passes arguments by assignment once I reassign X to a new matrix with fewer rows the reference to the original X is lost (and of course the same is true for y). Is it possible to maintain this reference?
I’m using a pandas DataFrame to easily drop the rows that have too many NaNs, this may not be the right way to do it for my use case. The current code looks like this:
class Dropna():
# thresh is max number of NaNs allowed in a row
def __init__(self, thresh=0):
self.thresh = thresh
def fit(self, X, y):
total = X.shape[1]
# +1 to account for 'y' being added to the dframe
new_thresh = total + 1 - self.thresh
df = pd.DataFrame(X)
df['y'] = y
df.dropna(thresh=new_thresh, inplace=True)
X = df.drop('y', axis=1).values
y = df['y'].values
return self
def transform(self, X):
return X
Upvotes: 33
Views: 23415
Reputation: 22031
You have to modify the internal code of sklearn Pipeline
.
We define a transformer that removes samples where at least the value of a feature or the target is NaN during fitting (fit_transform
). While it removes the samples where at least the value of a feature is NaN during inference (transform
). Important to note that our transformer returns X and y in fit_transform
so we need to handle this behaviour in the sklearn Pipeline
.
class Dropna():
def fit(self, X, y):
return self
def fit_transform(self, X, y):
mask = (np.isnan(X).any(-1) | np.isnan(y))
if hasattr(X, 'loc'):
X = X.loc[~mask]
else:
X = X[~mask]
if hasattr(y, 'loc'):
y = y.loc[~mask]
else:
y = y[~mask]
return X, y ###### make fit_transform return X and y
def transform(self, X):
mask = np.isnan(X).any(-1)
if hasattr(X, 'loc'):
X = X.loc[~mask]
else:
X = X[~mask]
return X
We only have to modify the original sklearn Pipeline
in only two specific points in fit
and in _fit
method. The rest remains unchanged.
from sklearn import pipeline
from sklearn.base import clone
from sklearn.utils import _print_elapsed_time
from sklearn.utils.validation import check_memory
class Pipeline(pipeline.Pipeline):
def _fit(self, X, y=None, **fit_params_steps):
self.steps = list(self.steps)
self._validate_steps()
memory = check_memory(self.memory)
fit_transform_one_cached = memory.cache(pipeline._fit_transform_one)
for (step_idx, name, transformer) in self._iter(
with_final=False, filter_passthrough=False
):
if transformer is None or transformer == "passthrough":
with _print_elapsed_time("Pipeline", self._log_message(step_idx)):
continue
try:
# joblib >= 0.12
mem = memory.location
except AttributeError:
mem = memory.cachedir
finally:
cloned_transformer = clone(transformer) if mem else transformer
X, fitted_transformer = fit_transform_one_cached(
cloned_transformer,
X,
y,
None,
message_clsname="Pipeline",
message=self._log_message(step_idx),
**fit_params_steps[name],
)
if isinstance(X, tuple): ###### unpack X if is tuple: X = (X,y)
X, y = X
self.steps[step_idx] = (name, fitted_transformer)
return X, y
def fit(self, X, y=None, **fit_params):
fit_params_steps = self._check_fit_params(**fit_params)
Xt = self._fit(X, y, **fit_params_steps)
if isinstance(Xt, tuple): ###### unpack X if is tuple: X = (X,y)
Xt, y = Xt
with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
if self._final_estimator != "passthrough":
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
self._final_estimator.fit(Xt, y, **fit_params_last_step)
return self
This is required in order to unpack the values generated by Dropna().fit_transform(X, y)
in the new X
and y
.
Here is the full pipeline at work:
from sklearn.linear_model import Ridge
X = np.random.uniform(0,1, (100,3))
y = np.random.uniform(0,1, (100,))
X[np.random.uniform(0,1, (100)) < 0.1] = np.nan
y[np.random.uniform(0,1, (100)) < 0.1] = np.nan
pipe = Pipeline([('dropna', Dropna()), ('model', Ridge())])
pipe.fit(X, y)
pipe.predict(X).shape
Another trial with a further intermediate preprocessing step:
from sklearn.preprocessing import StandardScaler
pipe = Pipeline([('dropna', Dropna()), ('scaler', StandardScaler()), ('model', Ridge())])
pipe.fit(X, y)
pipe.predict(X).shape
More complex behaviors can be achieved with other simple modifications according to the needs. If you are interested also in Pipeline().fit_transform
or Pipeline().fit_predict
you need to operate the same changes.
Upvotes: 13
Reputation: 982
You can use function transformer
df=pd.DataFrame([[1,2,3],[4,5,6],[np.NaN,np.NaN,9],[7,np.NaN,9]])
from sklearn.pipeline import FunctionTransformer,make_pipeline
def remove_na(df_,thresh=2):
return df.dropna(thresh=2)
pipe=make_pipeline(FunctionTransformer(func=remove_na,
validate=False,kw_args={"thresh":2}))
pipe.fit_transform(df)
Upvotes: 0
Reputation: 145
Adding to @João Matias response:
Here's an example of using imblearn to define a pipeline step that drops rows with missing values:
from imblearn import FunctionSampler
def drop_rows_with_any_nan(X, y):
return X[~np.isnan(X).any(axis=1), :], y[~np.isnan(X).any(axis=1)]
drop_rows_with_any_nan_sampler = FunctionSampler(func=drop_rows_with_any_nan, validate=False)
model_clf2 = pipeline.Pipeline(
[
('preprocess', column_transformer),
('drop_na', drop_rows_with_any_nan_sampler),
('smote', SMOTE()),
('xgb', xgboost.XGBClassifier()),
]
)
Note, you have to use the imblearn pipeline.
Upvotes: 2
Reputation: 156
The package imblearn
, which is built on top of sklearn
, contains an estimator FunctionSampler that allows manipulating both the features array, X
, and target array, y
, in a pipeline step.
Note that using it in a pipeline step requires using the Pipeline
class in imblearn
that inherits from the one in sklearn
. Furthermore, by default, in the context of Pipeline
, the method resample
does nothing when it is not called immediately after fit
(as in fit_resample
). So, read the documentation ahead of time.
Upvotes: 8
Reputation: 984
@eickenberg is the proper and clean answer. Nevertheless, I like to keep everything into one Pipeline, so if you are interested, I created a library (not yet deployed on pypi) that allow to apply transformation on Y:
https://gitlab.com/thibaultB/transformers/
Usage is the following:
df = pd.DataFrame([[0, 1, 2], [3, 4, 5], [6, 7, 8]])
df.columns = ["a", "b", "target"]
spliter = SplitXY("target") # Create a new step and give it name of column target
pipe = Pipeline([
("imputer", SklearnPandasWrapper(KNNImputer())),
("spliter", spliter),
("scaler", StandardScaler()),
("rf",
EstimatorWithoutYWrapper(RandomForestRegressor(random_state=45),
spliter)) # EstimatorWithoutYWrapper overwrite RandomForestRegressor to get y from spliter just before calling fit or transform
])
pipe.fit(df)
res = pipe.predict(df)
Using this code, you can alter the number of rows if you put all the transformer that modify the numbers of rows before the "SplitXY" transformer. Transformer before the SplitXY transformer should keep columns name, it is why I also added a SklearnPandasWrapper that wrap sklearn transformer (that usually return numpy array) to keep columns name.
Upvotes: 0
Reputation: 547
You can solve this easily by using the sklearn.preprocessing.FunctionTransformer method (http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html)
You just need to put your alternations to X in a function
def drop_nans(X, y=None):
total = X.shape[1]
new_thresh = total - thresh
df = pd.DataFrame(X)
df.dropna(thresh=new_thresh, inplace=True)
return df.values
then you get your transformer by calling
transformer = FunctionTransformer(drop_nans, validate=False)
which you can use in the pipeline. The threshold can be set outside the drop_nans function.
Upvotes: 1
Reputation: 14377
Modifying the sample axis, e.g. removing samples, does not (yet?) comply with the scikit-learn transformer API. So if you need to do this, you should do it outside any calls to scikit learn, as preprocessing.
As it is now, the transformer API is used to transform the features of a given sample into something new. This can implicitly contain information from other samples, but samples are never deleted.
Another option is to attempt to impute the missing values. But again, if you need to delete samples, treat it as preprocessing before using scikit learn.
Upvotes: 17
Reputation: 1
Use "deep-copies" further on, down the pipeline and X
, y
remain protected
.fit()
can first assign on each call deep-copy to new class-variables
self.X_without_NaNs = X.copy()
self.y_without_NaNs = y.copy()
and then reduce / transform these not to have more NaN
-s than ordered by self.treshold
Upvotes: -1