yanachen
yanachen

Reputation: 3753

Using keras model in pyspark lambda map function

I want to use the model to predict scores in map lambda function in PySpark.

def inference(user_embed, item_embed):
    feats = user_embed + item_embed
    dnn_model =  load_model("best_model.h5")
    infer = dnn_model.predict(np.array([feats]), verbose=0, steps=1)
    return infer
iu_score = iu.map(lambda x: Row(userid=x.userid, entryid=x.entryid, score = inference(x.user_embed, x.item_embed)))

The running is extremely slow and it stuck at the final stage quickly after code start running.

[Stage 119:==================================================>(4048 + 2) / 4050]

In HTOP monitor, only 2 of 80 cores are in full work load, others core seems not working. So what should I do to making the model predicting in parallel ? The iu is 300 million so the efficiency if important for me. Thanks.

enter image description here

I have turn verbose=1, and the predict log appears, but it seems that the prediction is just one by one , instead of predict in parallel.

Upvotes: 2

Views: 708

Answers (1)

mr_mo
mr_mo

Reputation: 1528

During the response I researched a little bit and found this question interesting. First, if efficiency is really important, invest a little time on recoding the whole thing without Keres. You still can use the high-level API for tensorflow (Models) and with a little effort to extract the parameters and assign them to the new model. Regardless it is unclear from all the massive implementations in the framework of wrappers (is TensorFlow not a rich enough framework?), you will most likely meet problems with backward compatibility when upgrading. Really not recommended for production.

Having said that, can you inspect what is the problem exactly, for instance - are you using GPUs? maybe they are overloaded? Can you wrap the whole thing to not exceed some capacity and use a prioritizing system? You can use a simple queue if not there are no priorities. You can also check if you really terminate tensorflow's sessions or the same machine runs many models that interfere with the others. There are many issues that can be the reason for this phenomena, it will be great to have more details.

Regarding the parallel computation - you didn't implement anything that really opens a thread or a process for this models, so I suspect that pyspark just can't handle the whole thing by its own. Maybe the implementation (honestly I didn't read the whole pyspark documentation) is assuming that the dispatched functions runs fast enough and doesn't distributed as it should. PySpark is simply a sophisticated implementation of map-reduce principles. The dispatched functions plays the role of a mapping function in a single step, which can be problematic for your case. Although it is passed as a lambda expression, you should inspect more carefully which are the instances that are slow, and on which machines they are running.

I strongly recommend you do as follows:
Go to Tensorflow deplot official docs and read how to really deploy a model. There is a protocol for communicating with the deployed models called RPC and also a restful API. Then, using your pyspark you can wrap the calls and connect with the served model. You can create a pool of how many models you want, manage it in pyspark, distribute the computations over a network, and from here the sky and the cpus/gpus/tpus are the limits (I'm still skeptical about the sky).

It will be great to get an update from you about the results :) You made me curious.

I hope you the best with this issue, great question.

Upvotes: 2

Related Questions