sfactor
sfactor

Reputation: 13062

Python: How to make machine learning predictions run faster in production?

I have created a machine learning model in scikit-learn which I need to deploy in production with live data. The features look like this for example:

  date          event_id  user_id     feature1    feature2    featureX...
  2017-01-27    100       5555        1.23        2           2.99
  2017-01-27    100       4444        2.55        5           3.16
  2017-01-27    100       3333        0.45        3           1.69
  2017-01-27    105       1212        3.96        4           0.0
  2017-01-27    105       2424        1.55        2           5.56
  2017-01-27    105       3636        0.87        4           10.28

So, there are different events each day. Before the event starts I basically store this in a dataframe by pulling them from a database and calculate the predictions using the pickled scikit model as:

df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
loaded_model = joblib.load("model.joblib.dat")
prediction = loaded_model.predict_proba(df_X)

Then I match the prediction back to df and send as an output to an API or file as needed.

When the events starts the featureX is constantly updated that I get from an API. To do the updating I'm using the loop that goes through each event_id and user_id and update df with the new featureX value, recalculate and send to the output again.

For that I'm doing something like this:

# get list of unique event ids
events = set(df['event_id'].tolist())

try:
    while True:
        start = time.time()
        for event in events:
            featureX = request.get(API_URL + event)
            featureX_json = featureX.json()

            for user in featureX_json['users']:
                df.loc[df.user_id == user['user_id'],
                       'featureX'] = user['featureX']

        df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
        df['prediction'] = loaded_model.predict_proba(df_X)

        # send to API or write to file

        end = time.time()
        print('recalculation time {} secs'.format(end - start))

except KeyboardInterrupt:
    print('exiting !')

This works fine for me but the whole prediction update takes around 4 secs in the server and I need it to be under 1 sec. I'm trying to figure out what I could change in the while loop to get the speedup I need?

A sample of the json has been added upon request for event_id = 100 the URL http://myapi/api/event_users/<event_id>:

{
    "count": 3,
    "users": [
        {
            "user_id": 4444,
            "featureY": 34,
            "featureX": 4.49,
            "created": "2017-01-17T13:00:09.065498Z"
        },
        {
            "user_id": 3333,
            "featureY": 22,
            "featureX": 1.09,
            "created": "2017-01-17T13:00:09.065498Z"
        },
         {
            "user_id": 5555,
            "featureY": 58,
            "featureX": 9.54,
            "created": "2017-01-17T13:00:09.065498Z"
        }
    ]
}

Upvotes: 3

Views: 1465

Answers (3)

Nikolay Petrov
Nikolay Petrov

Reputation: 88

You can try using accelerated implementations of algorithms - such as scikit-learn-intelex - https://github.com/intel/scikit-learn-intelex. This is a free software AI accelerator that brings over 10-100X acceleration across a variety of applications.

This library will provide great performance improvements for both training and predictions.

Example of speedup that you can achieve

First install package

pip install scikit-learn-intelex

And then add in your python script

from sklearnex import patch_sklearn
patch_sklearn()

Upvotes: 0

mathieujofis
mathieujofis

Reputation: 349

It would be better to subscribe to some kind of messaging queue, like Kafka. You could then consume FeatureX whenever it is updated, instead of endlessly making batch API calls in a loop and then iterating through the entire source of data, etc.

Regarding predictions, it might make sense to leverage a more scalable approach. You could split up the dataframe into chunks and make asynchronous requests to a scalable, high-throughput prediction API. With this method, you are only limited by network latency and how many requests you can make simultaneously. If the prediction API can handle thousands/10k's/100k's of requests/sec, your prediction time could be reduced to less than a second (and possibly just a couple hundred milliseconds).

My service mlrequest is a low-latency, high-throughput, high-availability machine learning API that is well suited to this kind of problem. We can handle and scale to many, many thousands of predictions per second. Scikit Learn models and Pandas Dataframes will be supported in the next release (coming soon). Below is a simple example of training and predicting. You can get a free api key that gets you 50,000 model transactions per month.

Install the mlrequest Python client

$pip install mlrequest

Training a model and deploying it to 5 data centers around the world is as simple as:

from mlrequest import Classifier
classifier = Classifier('my-api-key')
features = {'feature1': 'val1','feature2': 100}
training_data = [{'features': features, 'label': 1}, ...]
r = classifier.learn(training_data=training_data, model_name='my-model', class_count=2) 

Predicting

features = [{'feature1': 'val1', 'feature2': 77}, ...]
r = classifier.predict(features=features, model_name='my-model', class_count=2)
r.predict_result

Upvotes: 0

MaxU - stand with Ukraine
MaxU - stand with Ukraine

Reputation: 210942

# get list of unique event ids
events = df['event_id'].unique().tolist()

try:
    while True:     # i don't understand why do you need this loop...
        start = time.time()
        for event in events:
            featureX = request.get(API_URL + event)
            tmp = pd.DataFrame(featureX.json()['users'])

            df.loc[(df.event_id == event), 'featureX'] = \
                df.loc[df.event_id == event, 'user_id'] \
                  .map(tmp.set_index('user_id').featureX)

        df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
        df['prediction'] = loaded_model.predict_proba(df_X)

        # send to API or write to file

        end = time.time()
        print('recalculation time {} secs'.format(end - start))

except KeyboardInterrupt:
    print('exiting !')

Demo: for event_id == 100

first let's create a DF from your JSON object:

tmp = pd.DataFrame(featureX_json['users'])

In [33]: tmp
Out[33]:
                       created  featureX  featureY  user_id
0  2017-01-17T13:00:09.065498Z      4.49        34     4444
1  2017-01-17T13:00:09.065498Z      1.09        22     3333
2  2017-01-17T13:00:09.065498Z      9.54        58     5555

now we can get rid of for user in featureX_json['users']: loop:

In [29]: df.loc[df.event_id == 100, 'featureX'] = \
             df.loc[df.event_id == 100, 'user_id'].map(tmp.set_index('user_id').featureX)

In [30]: df
Out[30]:
         date  event_id  user_id  feature1  feature2  featureX
0  2017-01-27       100     5555      1.23         2      9.54   # 2.99 -> 9.54
1  2017-01-27       100     4444      2.55         5      4.49   # 3.16 -> 4.49
2  2017-01-27       100     3333      0.45         3      1.09   # 1.69 -> 1.09
3  2017-01-27       105     1212      3.96         4      0.00
4  2017-01-27       105     2424      1.55         2      5.56
5  2017-01-27       105     3636      0.87         4     10.28

Upvotes: 1

Related Questions