Reputation: 6869
I have a large set of sklearn pipelines that I'd like to build in parallel with Dask. Here's a simple but naive sequential approach:
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target, test_size=0.2)
pipe_nb = Pipeline([('clf', MultinomialNB())])
pipe_lr = Pipeline([('clf', LogisticRegression())])
pipe_rf = Pipeline([('clf', RandomForestClassifier())])
pipelines = [pipe_nb, pipe_lr, pipe_rf] # In reality, this would include many more different types of models with varying but specific parameters
for pl in pipelines:
pl.fit(X_train, Y_train)
Note that this is not GridSearchCV or RandomSearchCV problem
In the case of RandomSearchCV, I know how to parallelize it with Dask:
dask_client = Client('tcp://some.host.com:8786')
clf_rf = RandomForestClassifier()
param_dist = {'n_estimators': scipy.stats.randint(100, 500}
search_rf = RandomizedSearchCV(
clf_rf,
param_distributions=param_dist,
n_iter = 100,
scoring = 'f1',
cv=10,
error_score = 0,
verbose = 3,
)
with joblib.parallel_backend('dask'):
search_rf.fit(X_train, Y_train)
However, I'm not interested in hyperparameter tuning and it isn't clear how to modify this code in order to fit a set of multiple different models with their own specific parameters in parallel with Dask.
Upvotes: 6
Views: 2798
Reputation: 28946
dask.delayed
is probably the easiest solution here.
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target, test_size=0.2)
pipe_nb = Pipeline([('clf', MultinomialNB())])
pipe_lr = Pipeline([('clf', LogisticRegression())])
pipe_rf = Pipeline([('clf', RandomForestClassifier())])
pipelines = [pipe_nb, pipe_lr, pipe_rf] # In reality, this would include many more different types of models with varying but specific parameters
# Use dask.delayed instead of a for loop.
import dask.delayed
pipelines_ = [dask.delayed(pl).fit(X_train, Y_train) for pl in pipelines]
fit_pipelines = dask.compute(*pipelines_)
Upvotes: 7