zbinsd
zbinsd

Reputation: 4214

How to load a Spark model for efficient predictions

When I build a Spark model and call it, the predictions take tens of ms to return. However, when I save that same model, then load it, the predictions take much longer. Is there some sort of cache I should be using?

model.cache() after loading does not work, as the model is not an RDD.

This works great:

from pyspark.mllib.recommendation import ALS
from pyspark import SparkContext
import time

sc = SparkContext()

# Some example data
r = [(1, 1, 1.0),
    (1, 2, 2.0),
    (2, 1, 2.0)]
ratings = sc.parallelize(r)
model = ALS.trainImplicit(ratings, 1, seed=10)

# Call model and time it
now = time.time()
for t in range(10):
    model.predict(2, 2)

elapsed = (time.time() - now)*1000/(t+1)

print "Average time for model call: {:.2f}ms".format(elapsed)

model.save(sc, 'my_spark_model')

Output: Average time for model call: 71.18ms

If I run the following, the predictions take much more time:

from pyspark.mllib.recommendation import MatrixFactorizationModel
from pyspark import SparkContext
import time

sc = SparkContext()

model_path = "my_spark_model"
model = MatrixFactorizationModel.load(sc, model_path)

# Call model and time it
now = time.time()
for t in range(10):
    model.predict(2, 2)

elapsed = (time.time() - now)*1000/(t+1)

print "Average time for loaded model call: {:.2f}ms".format(elapsed)

The output: Average time for loaded model call: 180.34ms

For BIG models, I'm seeing prediction times over 10 seconds for a single call after loading a saved model.

Upvotes: 4

Views: 2251

Answers (1)

gsamaras
gsamaras

Reputation: 73376

In short: No, it doesn't seem to be something that could cache the whole model, since it's not an RDD.


Yu can try to use cache(), but you cannot cache the model itself, because it is not an RDD, so try this:

model.productFeatures().cache()
model.userFeatures().cache()

It is recommended to unpersist() them after you don't need it, especially if you handle really big data, since you will want to protect your job from out-of-memory errors.

Of course, you could use persist() instead of cache(); you might want to read: What is the difference between cache and persist?


Remember that Spark does transformations lazily, so when you load the model nothing really happens. It needs an action to trigger actual work (i.e. when you really use the model then Spark will attempt to load it, causing you to experience some latency, versus on having it in memory).

Also note that: cache() is lazy, so you can use RDD.count() explicitly to load into memory.


Experiments' output:

Average time for model call: 1518.83ms
Average time for loaded model call: 2352.70ms
Average time for loaded model call with my suggestions: 8886.61ms

By the way, after the loading of the model, you should receive this kind of warnings:

16/08/24 00:14:05 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow.
16/08/24 00:14:05 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow.

But what if I do the count trick? I won't get any gain at all, in fact I will be slower:

...
model.productFeatures().cache()
model.productFeatures().count()
model.userFeatures().cache()
model.userFeatures().count()
...

Output:

Average time for loaded model call: 13571.14ms

Without the cache(), keeping the count()s, I got:

Average time for loaded model call: 9312.01ms

Important Note: Timing performed in real-world cluster, where the nodes are given to important jobs, so my toy example may have been preempted during the experiments. Moreover, the communications cost may dominate.

So, if I were you I would conduct the experiments myself too.


In conclusion, there doesn't seem to be any other mechanism available from Spark on caching your model, other than that.

Upvotes: 3

Related Questions