Reputation: 634
I am trying to score multiple datasets at the same time using multiprocessing. The following code hangs on run, yet when I run score on the base_model outside of the pool it executes instantly.
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
titanic = pd.read_csv('titanic.csv')
titanic['Age'].fillna(titanic['Age'].mean(), inplace=True) #fill all missing values with the average
#create indicator for cabin because there is areason it was missing
titanic['Cabin_ind'] = np.where(titanic['Cabin'].isnull(), 0, 1)
#convert sex to numeric
gender_num = {'male': 0, 'female': 1}
titanic['Sex'] = titanic['Sex'].map(gender_num)
#drop uncessary variables
titanic.drop(columns=['Cabin', 'Embarked', 'Name', 'Ticket'], inplace=True)
features = titanic.drop(columns=['Survived'])
labels = titanic['Survived']
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=.2, random_state=42)
from xgboost import XGBClassifier
base_model = XGBClassifier(objective='binary:logistic', nthread=4, seed=27, scoring='auc')
base_model.fit(X_train, y_train)
import multiprocessing as mp
def my_func(X_test, y_test, base_model):
val = base_model.score(X_test, y_test)
print(val)
def main():
to_pass = [(X_test, y_test, base_model)]
pool = mp.Pool(1)
pool.starmap(my_func, to_pass)
if __name__ == "__main__":
main()
Upvotes: 1
Views: 2069
Reputation: 897
This issue arises from the interaction between XGBoost’s multithreading and Python’s multiprocessing module. XGBoost uses OpenMP for parallel computations, which can conflict with the multiprocessing module when using the fork start method (and that is the default on Unix-like systems). This conflict leads to deadlocks or hangs when XGBoost code is run within multiple processes.
To solve this issue, you can restrict XGBoost to use only one thread. This can be achieved by setting the OMP_NUM_THREADS environment variable to 1 before importing XGBoost:
import os
os.environ['OMP_NUM_THREADS'] = '1'
By limiting XGBoost to run in single-threaded mode, you avoid the conflict with OpenMP, and your code can be executed within a multiprocessing pool.
Upvotes: 0
Reputation: 908
The problem with XGBoost
and multiprocessing
is that you don't know what XGBoost
does under the hood when you pass it to multiprocessing. This is a common thing not only with XGBoost but other libraries too and I have found many of them hang when you use different processes to speed up your work.
What I tend to do is to save the model and then use multiprocessing to create a new XGBClassifier and load the model from there. It is much safer and I am more certain nothing weird is gonna happen.
Here is an example
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from concurrent.futures import ProcessPoolExecutor, as_completed
from sklearn.metrics import accuracy_score
def get_data():
titanic = pd.read_csv('titanic.csv', usecols=['Age', 'Cabin', 'Sex', 'Survived'])
# fill all missing values with the average
titanic['Age'] = titanic['Age'].fillna(titanic['Age'].mean())
# create indicator for cabin because there is areason it was missing
titanic['Cabin_ind'] = np.where(titanic['Cabin'].isna(), 0, 1)
# convert sex to numeric
titanic['Sex'] = np.where(titanic['Sex'] == 'male', 0, 1)
# convert Survived to numeric
titanic['Survived'] = np.where(titanic['Survived'] == 'yes', 1, 0)
features = titanic[['Age', 'Sex', 'Cabin_ind']]
labels = titanic['Survived']
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)
return X_train, X_test, y_train, y_test
def train_model(X_train, y_train):
print('Training model')
base_model = XGBClassifier(objective='binary:logistic', nthread=4, seed=27, scoring='auc', use_label_encoder=False)
base_model.fit(X_train, y_train)
base_model.save_model('model.json')
def score_model(i, X_test, y_test):
print('{}: Scoring model'.format(i))
base_model = XGBClassifier(objective='binary:logistic', nthread=4, seed=27, scoring='auc', use_label_encoder=False)
base_model.load_model('model.json')
score = base_model.score(X_test, y_test)
y_pred = base_model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
return i, score, accuracy
def main():
X_train, X_test, y_train, y_test = get_data()
with ProcessPoolExecutor(max_workers=1) as executor:
task = executor.submit(train_model, X_train, y_train)
task.result()
tasks = []
with ProcessPoolExecutor(max_workers=1) as executor:
# You can split your test here, I am passing it as a whole
for i in range(10):
# I also use i in function to identify result later in case of out of order completion
task = executor.submit(score_model, i, X_test, y_test)
tasks.append(task)
for task in as_completed(tasks):
i, score, accuracy = task.result()
print('Task {} completed: score={}, accuracy={}'.format(i, score, accuracy))
if __name__ == '__main__':
main()
EDIT:
XGBoost
in another process to avoid lockingXGBoost
it is good to force use_label_encoder=False
. The reason is that XGBoost
, at least on my computer complains aboutUserWarning: The use of label encoder in XGBClassifier is
deprecated and will be removed in a future release.
To remove this warning, do the following:
1) Pass option use_label_encoder=False when constructing XGBClassifier object; and
2) Encode your labels (y) as integers starting with 0, i.e. 0, 1, 2, ..., [num_class - 1].
inplace
in pandas
, as discussed here thoroughlymax_workers=some_cpu_number_that_makes_sense
to use many cpus.XGBoost
and to my experience you don't really need multiprocessing
with it, it is pretty fast even with a 2 GB dataset. Unless you have a really really big dataset...csv I used
Age,Cabin,Sex,Survived,Embarked,Name,Ticket
,,male,yes,,,
19,,male,no,,,
20,1,female,yes,,,
25,2,male,no,,,
,3,female,yes,,,
40,3,male,yes,,,
,3,female,yes,,,
,,female,yes,,,
,3,male,no,,,
15,3,female,yes,,,
Upvotes: 3