Reputation: 451
I'm trying to set up a scikit-learn pipeline to simplify my work. The problem I'm facing is that I don't know which algorithm (random forest, naive bayes, decision tree etc.) fits best so I need to try each of them and compare the results. However does pipeline only take one algorithms at a time? For example below pipeline only takes in SGDClassifier() as the algorithm.
pipeline = Pipeline([
('vect', CountVectorizer()),
('tfidf', TfidfTransformer()),
('clf', SGDClassifier()),])
What should I do if I want to compare different algorithms? Can I do something like this?
pipeline = Pipeline([
('vect', CountVectorizer()),
('tfidf', TfidfTransformer()),
('clf', SGDClassifier()),
('classifier', MultinomialNB()),])
I don't want to break it down into two pipelines because the preprocess of the data is super time consuming.
Thanks in advance!
Upvotes: 9
Views: 10093
Reputation: 813
Just starting out on my Python journey. The ideas below are not my own. All the credit goes to David S. Batista https://www.davidsbatista.net/blog/2018/02/23/model_optimization/
who modified code by Panagiotis Katsaroumpas and shared it.
What I have done is modify David's code a bit by adding a user defined score and a preprocessing step that includes data imputation and scaling prior to model estimation. So here goes:
# import the libraries
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
Imputer = IterativeImputer(max_iter=10, random_state=15) # I used a custom wrapper
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import IsolationForest
from inne import IsolationNNE
## create the scoring for the models and save it to file
## in current working directory as 'scorers.py'
# def scorer_decision(estimator, X):
# return np.nanmean(estimator.decision_function(X))
## import decision function score saved as .py file
## into working directory of project
from scorers import scorer_decision
class EstimatorSelectionHelper:
def __init__(self, models, params):
if not set(models.keys()).issubset(set(params.keys())):
missing_params = list(set(models.keys()) - set(params.keys()))
raise ValueError("Some estimators are missing parameters: {}".format(missing_params))
self.models = models
self.params = params
self.keys = models.keys()
self.grid_searches = {}
def fit(self, X, y=None, cv=5, n_jobs=3, verbose=1, scoring=scorer_decision, refit=True):
for key in self.keys:
print("Running GridSearchCV for %s." % key)
model = self.models[key]
params = self.params[key]
gs = GridSearchCV(estimator=model,
param_grid=params,
cv=cv,
n_jobs=n_jobs,
verbose=verbose,
scoring=scoring,
refit=refit,
return_train_score=True)
gs.fit(X,y=None)
self.grid_searches[key] = gs
def score_summary(self, sort_by='mean_score'):
def row(key, scores, params):
d = {
'estimator': key,
'min_score': min(scores),
'max_score': max(scores),
'mean_score': np.mean(scores),
'std_score': np.std(scores),
}
return pd.Series({**params,**d})
rows = []
for k in self.grid_searches:
print(k)
params = self.grid_searches[k].cv_results_['params']
scores = []
for i in range(self.grid_searches[k].cv):
key = "split{}_test_score".format(i)
r = self.grid_searches[k].cv_results_[key]
scores.append(r.reshape(len(params),1))
all_scores = np.hstack(scores)
for p, s in zip(params, all_scores):
rows.append((row(k, s, p)))
df = pd.concat(rows, axis=1).T.sort_values([sort_by], ascending=False)
columns = ['estimator', 'min_score', 'mean_score', 'max_score', 'std_score']
columns = columns + [c for c in df.columns if c not in columns]
return df[columns]
# list of numeric features to impute
numeric_columns = list(Xtrain.select_dtypes(include = 'number').columns)
# pipeline for processing numerical feeatures
numeric_transformer = Pipeline([
('imputer', Imputer()),
('scaler', StandardScaler())
])
# column transformer
column_transformer = ColumnTransformer([
('numeric_pipeline', numeric_transformer, numeric_columns)
])
# grid search parameters for models
num_estimators = np.linspace(100, 200, num = 5, endpoint = True).astype(int)
max_samples = np.linspace(0.70, 1.00, num = 5)
contamination = np.linspace(0.04, 0.10, num = 5, endpoint = True)
max_features = np.arange(start = 1, stop = Xdata.shape[1]+1, step = 1)
# estimators to use
models1 = {
'iforest': IsolationForest(n_jobs = -1, random_state = 3),
'iNNE': IsolationNNE(random_state = 3)
}
# parameters
params1 = {
# isolation forest grid parameters
'iforest': {
'n_estimators': num_estimators,
'max_samples': max_samples,
'contamination': contamination,
'max_features': max_features,
'bootstrap': [False]
},
# inne grid parameters
'iNNE': {
'n_estimators': num_estimators,
'max_samples': max_samples,
'contamination': contamination
}
}
## run the models
# create EstimatorSelectionHelper by passing models and parameters
estimators = EstimatorSelectionHelper(models1, params1)
# create pipeline
pipe = Pipeline([
('ct', column_transformer),
('models', estimators)
])
pipe.fit(Xdata)
## get summary output
output = pipe.named_steps.models.score_summary(sort_by = 'max_score')
output.head()
Upvotes: 0
Reputation: 4046
Improving on Bruno's answer, what most people really want to do is be able to pass in ANY classifier (not have to hard-code each one) and also any parameters for each classifier. Here is an easy way to do this:
from sklearn.base import BaseEstimator
class ClfSwitcher(BaseEstimator):
def __init__(
self,
estimator = SGDClassifier(),
):
"""
A Custom BaseEstimator that can switch between classifiers.
:param estimator: sklearn object - The classifier
"""
self.estimator = estimator
def fit(self, X, y=None, **kwargs):
self.estimator.fit(X, y)
return self
def predict(self, X, y=None):
return self.estimator.predict(X)
def predict_proba(self, X):
return self.estimator.predict_proba(X)
def score(self, X, y):
return self.estimator.score(X, y)
Now you can pass in anything for the estimator parameter. And you can optimize any parameter for any estimator you pass in as follows:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
pipeline = Pipeline([
('tfidf', TfidfVectorizer()),
('clf', ClfSwitcher()),
])
parameters = [
{
'clf__estimator': [SGDClassifier()], # SVM if hinge loss / logreg if log loss
'tfidf__max_df': (0.25, 0.5, 0.75, 1.0),
'tfidf__stop_words': ['english', None],
'clf__estimator__penalty': ('l2', 'elasticnet', 'l1'),
'clf__estimator__max_iter': [50, 80],
'clf__estimator__tol': [1e-4],
'clf__estimator__loss': ['hinge', 'log', 'modified_huber'],
},
{
'clf__estimator': [MultinomialNB()],
'tfidf__max_df': (0.25, 0.5, 0.75, 1.0),
'tfidf__stop_words': [None],
'clf__estimator__alpha': (1e-2, 1e-3, 1e-1),
},
]
gscv = GridSearchCV(pipeline, parameters, cv=5, n_jobs=12, return_train_score=False, verbose=3)
gscv.fit(train_data, train_labels)
clf__estimator__loss
clf__estimator__loss
is interpreted as the loss
parameter for whatever estimator
is, where estimator = SGDClassifier()
in the top most example and is itself a parameter of clf
which is a ClfSwitcher
object.
Upvotes: 18
Reputation: 2131
You say that preprocessing the data is very slow, so I assume that you consider the TF-IDF Vectorization part of your preprocessing.
You could preprocess just once.
X = <your original data>
from sklearn.feature_extraction.text import TfidfVectorizer
X = TfidfVectorizer().fit_transform(X)
Once you have your new transformed data, you can continue using it and choose the best classifier.
While you could transform your data with TfidfVectorizer
just once, I would not recommend it, because the TfidfVectorizer
has hyper-parameters itself, which can also be optimized. In the end, you want to optimize the whole Pipeline
together, because the parameters for the TfidfVectorizer in
a Pipeline [TfidfVectorizer, SGDClassifier]
can be different than for a Pipeline [TfidfVectorizer, MultinomialNB]
.
To give an answer to what you asked exactly, you could make your own estimator that has the choice of model as a hyper-parameter.
from sklearn.base import BaseEstimator
class MyClassifier(BaseEstimator):
def __init__(self, classifier_type: str = 'SGDClassifier'):
"""
A Custome BaseEstimator that can switch between classifiers.
:param classifier_type: string - The switch for different classifiers
"""
self.classifier_type = classifier_type
def fit(self, X, y=None):
if self.classifier_type == 'SGDClassifier':
self.classifier_ = SGDClassifier()
elif self.classifier_type == 'MultinomialNB':
self.classifier_ = MultinomialNB()
else:
raise ValueError('Unkown classifier type.')
self.classifier_.fit(X, y)
return self
def predict(self, X, y=None):
return self.classifier_.predict(X)
You can then use this customer classifier in your Pipeline
.
pipeline = Pipeline([
('tfidf', TfidfVectorizer()),
('clf', MyClassifier())
])
You can then you GridSearchCV
to choose the best model. When you create a parameter space, you can use double underscore to specify the hyper-parameter of a step in your pipeline
.
parameter_space = {
'clf__classifier_type': ['SGDClassifier', 'MultinomialNB']
}
from sklearn.model_selection import GridSearchCV
search = GridSearchCV(pipeline , parameter_space, n_jobs=-1, cv=5)
search.fit(X, y)
print('Best model:\n', search.best_params_)
Upvotes: 8