Reputation: 55
I'm trying to port over some "parallel" Python code to Azure Databricks. The code runs perfectly fine locally, but somehow doesn't on Azure Databricks. The code leverages the multiprocessing
library, and more specifically the starmap
function.
The code goes like this:
from sklearn import metrics
import lightgbm as lgb
import numpy as np
def init_pool():
from threading import current_thread
ident = current_thread().ident
np.random.seed(ident)
def train_model(params, Xt, yt, Xv, yv):
model = lgb.LGBMClassifier(objective='binary', subsample=0.8, random_state=123, **params)
model.fit(Xt, yt)
proba = model.predict_proba(Xv)[:, 1]
return metrics.roc_auc_score(yv, proba)
if __name__ == "__main__":
from sklearn.model_selection import train_test_split
from itertools import product, repeat
import multiprocessing as mp
from time import time
import pandas as pd
def generate_data(n):
'''Generates random data'''
df = pd.DataFrame({
'x1': np.random.random(n) * 100,
'x2': np.random.choice(['a', 'b', 'c', 'd'], n, replace=True),
'x3': np.random.choice(['cow', 'platypus', 'koala', 'panda', 'camel'], n, replace=True),
'x4': np.random.poisson(15, n),
'y': np.random.choice([0, 1], n, replace=True, p=[0.8, 0.2])
})
# Necessary steps for lightgbm
for _ in df.columns:
if df[_].dtypes == 'object':
df[_] = df[_].astype('category')
X, y = df.drop(['y'], axis=1), df['y']
return train_test_split(X, y, test_size=0.3, stratify=y)
def grid_to_list(grid):
'''Parameter grid is converted to a list of all combinations'''
keys, values = zip(*grid.items())
return [dict(zip(keys, v)) for v in product(*values)]
param_list = grid_to_list({
'num_leaves': [20, 30, 40],
'learning_rate': [0.1, 0.3],
'n_estimators': [50, 100, 250]
})
n = 100_000
Xt, Xv, yt, yv = generate_data(n=n)
pool_size = min(mp.cpu_count(), len(param_list))
start = time()
p = mp.Pool(pool_size, initializer=init_pool)
ROC = p.starmap(train_model, zip(param_list, repeat(Xt), repeat(yt), repeat(Xv), repeat(yv)))
p.close()
p.join()
end = time()
print(f"Total running time for {len(param_list)} combinations: {round(end - start, 0)} seconds.")
print(f"Highest ROC AUC score: {np.max(ROC)}")
print(f"Matching parameters: {param_list[np.argmax(ROC)]}")
Running this on my personnal laptop outputs the following:
Total running time for 18 combinations: 24.0 seconds.
Highest ROC AUC score: 0.5079410814800223
Matching parameters: {'num_leaves': 30, 'learning_rate': 0.3, 'n_estimators': 50}
So my first question is:
Now, poking around a bit looking for alternatives, I was told about "resilient distributed datasets" or "rdd" and, after some effort, managed to have the following work:
from sklearn.model_selection import train_test_split
from itertools import product, repeat
import multiprocessing as mp
from sklearn import metrics
import lightgbm as lgb
from time import time
import pandas as pd
import numpy as np
def generate_data(n):
df = pd.DataFrame({
'x1': np.random.random(n) * 100,
'x2': np.random.choice(['a', 'b', 'c', 'd'], n, replace=True),
'x3': np.random.choice(['cow', 'platypus', 'koala', 'panda', 'camel'], n, replace=True),
'x4': np.random.poisson(15, n),
'y': np.random.choice([0, 1], n, replace=True, p=[0.8, 0.2])
})
# Necessary steps for lightgbm
for _ in df.columns:
if df[_].dtypes == 'object':
df[_] = df[_].astype('category')
X, y = df.drop(['y'], axis=1), df['y']
return train_test_split(X, y, test_size=0.3, stratify=y)
n = 100_000
Xt, Xv, yt, yv = generate_data(n=n)
def grid_to_list(grid):
'''Parameter grid is converted to a list of all combinations'''
keys, values = zip(*grid.items())
return [dict(zip(keys, v)) for v in product(*values)]
param_list = grid_to_list({
'num_leaves': [20, 30, 40],
'learning_rate': [0.1, 0.3],
'n_estimators': [50, 100, 250]
})
class HyperparameterOptimiser:
def __init__(self, params, Xt, yt, Xv, yv, train_fct):
self.param_list = params
self.Xt = Xt
self.yt = yt
self.Xv = Xv
self.yv = yv
self.train_fct = train_fct
def optimise(self, n_jobs=None):
if n_jobs is None:
n_jobs = min(len(self.param_list), 4 * 16) # Pourquoi 4 * 16?
start = time()
# <BEGIN ANNOYING SECTION>
train_fct = self.train_fct
Xt = self.Xt
yt = self.yt
Xv = self.Xv
yv = self.yv
rdd = sc.parallelize(self.param_list, n_jobs)
self.ROC = rdd.map(lambda p: train_fct(p, Xt, yt, Xv, yv)).collect()
# <END ANNOYING SECTION>
self.running_time = round(time() - start, 0)
self.output_results()
pass
def output_results(self):
print(f"Total running time for {len(self.param_list)} combinations: {self.running_time} seconds.")
print(f"Highest ROC AUC score: {max(self.ROC)}")
print(f"Matching parameters: {self.param_list[np.argmax(self.ROC)]}")
pass
def train_model(params, Xt, yt, Xv, yv):
model = lgb.LGBMClassifier(objective='binary', subsample=0.8, random_state=123, **params)
model.fit(Xt, yt)
predictions = model.predict_proba(Xv)[:, 1]
return metrics.roc_auc_score(yv, predictions)
# Note: very useful to be able to pass whatever "train function" is warranted with regard to context
ho = HyperparameterOptimiser(param_list, Xt, yt, Xv, yv, train_model)
ho.optimise()
In this case, the running time is the following:
Total running time for 18 combinations: 356.0 seconds.
Highest ROC AUC score: 0.5065868367986968
Matching parameters: {'num_leaves': 20, 'learning_rate': 0.3, 'n_estimators': 100}
This, however, raises more questions than answers:
self
object, as I would have with the starmap
function in the first case?I am guessing part of the answer to question no2 has to do with my choice of cluster, relative to the specs of my personnal computer. While I agree with that, the code is far from intensive, and I find it somewhat puzzling that the difference would amount to that big a number.
Hopefully, this will generate discussions that'll be helpful to others as well. Cheers.
Upvotes: 3
Views: 9594
Reputation: 31
I had this same issue in Azure Databricks and was only able to perform parallel processing based on threads instead processes. Take a look at the following post I made on the subject: "How to do parallel programming in Python?". The code is very simple and easy to customize. Just copy and use!
Upvotes: 2
Reputation: 87214
You should stop trying to invent the wheel, and instead start to leverage the built-in capabilities of Azure Databricks. Because Apache Spark (and Databricks) is the distributed system, machine learning on it should be also distributed. There are two approaches to that:
Training algorithm is implemented in the distributed fashion - there is a number of such algorithms packaged into Apache Spark and included into Databricks Runtimes
Use machine learning implementations designed to run on a single node, but train multiple models in parallel - that what typically happens during hyper-parameters optimization. And what is you're trying to do
Databricks runtime for machine learning includes the Hyperopt library that is designed for the efficient finding of best hyper-parameters without trying all combinations of the parameters, that allows to find them faster. It also include the SparkTrials API that is designed to parallelize computations for single-machine ML models such as scikit-learn. Documentation includes a number of examples of using that library with single-node ML algorithms, that you can use as a base for your work - for example, here is an example for scikit-learn.
P.S. When you're running the code with multiprocessing, then the code is executed only on the driver node, and the rest of the cluster isn't utilized at all.
Upvotes: 2