Pierre
Pierre

Reputation: 988

Apply sklearn trained model on a dataframe with PySpark

I trained a random forest algorithm with Python and would like to apply it on a big dataset with PySpark.

I first loaded the trained sklearn RF model (with joblib), loaded my data that contains the features into a Spark dataframe and then I add a column with the predictions, with a user-defined function like that:

def predictClass(features):
    return rf.predict(features)
udfFunction = udf(predictClass, StringType())
new_dataframe = dataframe.withColumn('prediction', 
udfFunction('features'))

It takes so much time to run though, is there a more efficient way to do the same thing? (without using Spark ML)

Upvotes: 8

Views: 7970

Answers (3)

Jacek Placek
Jacek Placek

Reputation: 80

I had to do same thing in recent project. The bad thing about applying udf for each row that pyspark has to read sklearn model each time so that's why it takes ages to finish. The best solution I have found was to use .mapPartitions or foreachPartition method on rdd, really good explanation is here

https://github.com/mahmoudparsian/pyspark-tutorial/blob/master/tutorial/map-partitions/README.md

It works fast because it ensures you that there is no shuffling and for each partition pyspark has to read the model and predict only once. So, the flow would be:

  • convert DF to RDD
  • broadcast model to nodes so it will be accessible for workers
  • write an udf function which takes interator (which contains all rows within a partition) as an argument
  • iterate through rows and create a proper matrix with your features (order matters)
  • call .predict only once
  • return predictions
  • transform rdd to df if needed

Upvotes: 2

Garvit
Garvit

Reputation: 400

Now you can also use pandas_udf introduced in spark 2.3 to achieve high processing speed and distributed computation. It is based on pyarrow a python implementation of Apache Arrow use for in-memory computation.

Upvotes: 0

peter
peter

Reputation: 1054

sklearn RF model can be quite large when being pickled. It is possible that frequent picklings/unpicklings of the model during task dispatch cause the problem. You could consider using broadcast variables.

From the official document:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Upvotes: 1

Related Questions