pedro valiente verde
pedro valiente verde

Reputation: 23

pyspark: CrossValidator not work

I'm trying to tune the parameters of an ALS but always choose the first parameter as best option

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from math import sqrt

from operator import add

conf = (SparkConf()
         .setMaster("local[4]")
         .setAppName("Myapp")
         .set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)

sqlContext = SQLContext(sc)
def computeRmse(data):
    return (sqrt(data.map(lambda x: (x[2] - x[3]) ** 2).reduce(add) / float(data.count())))

dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
                                 ["user", "item", "rating"])

lr1 = ALS()
grid1 = ParamGridBuilder().addGrid(lr1.regParam, [1.0,0.005,2.0]).build()
evaluator1 = RegressionEvaluator(predictionCol=lr1.getPredictionCol(),labelCol=lr1.getRatingCol(), metricName='rmse')
cv1 = CrossValidator(estimator=lr1, estimatorParamMaps=grid1, evaluator=evaluator1, numFolds=2)
cvModel1 = cv1.fit(dfRatings)
a=cvModel1.transform(dfRatings)
print ('rmse with cross validation: {}'.format(computeRmse(a)))

for reg_param in (1.0,0.005,2.0):
    lr = ALS(regParam=reg_param)
    model = lr.fit(dfRatings)
    print ('reg_param: {}, rmse: {}'.format(reg_param,computeRmse(model.transform(dfRatings))))

Output:
rmse with cross validation: 1.1820489116858794
reg_param: 1.0, rmse: 1.1820489116858794
reg_param: 0.005, rmse: 0.001573816765686575
reg_param: 2.0, rmse: 2.1056964491942787

Any help?

Thanks in advance,

Upvotes: 2

Views: 2433

Answers (3)

Scratch'N'Purr
Scratch'N'Purr

Reputation: 10399

I implemented a Pipeline solution, where I added a custom transformer to the final stage of my pipeline, so that nan predictions would be dropped. Note, that this implementation is for Spark < 2.2.0, because the keyword coldStartStrategy, was not introduced. Therefore, if you're using Spark==2.2.0, then you wouldn't need the additional stage.

First, I introduce the custom transformer that applies the nan drops.

from pyspark.ml import Transformer

class DropNAPredictions(Transformer):
    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

Now I can build my pipeline and train using cross validation:

dropna = DropNAPredictions()

als = ALS(maxIter=10, userCol="player", itemCol="item", ratingCol="rating", implicitPrefs=False)

pipeline = Pipeline(stages=[als, dropna])
paramGrid = ParamGridBuilder().addGrid(als.regParam, [0.1, 0.05]) \
    .addGrid(als.rank, [1, 3]) \
    .build()

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=RegressionEvaluator(labelCol="rating"),
                    numFolds=3)

cvModel = cv.fit(training)

A note on persistence: The pipeline can't be saved due to the custom transformer. There's a post that discusses options for serializing custom transformers but I haven't gone down that rabbit hole to hack at a solution. As a temporary solution, you can serialize just the ALS model itself, and then later rebuild the pipeline by adding the custom transformer to the pipeline.

bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[0]  # extracts the ALS model
bestModel.save("s2s_als_stage")

from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.recommendation import ALSModel

mymodel = ALSModel.load('s2s_als_stage')
pipeline = PipelineModel(stages=[mymodel, dropna])  # dropna is the custom transformer
pred_test = pipeline.transform(test)  # score test data

Upvotes: 0

zero323
zero323

Reputation: 330063

Putting aside other issues you simply don't use enough data to perform meaningful cross validation and evaluation. As I explained and illustrated in Spark ALS predictAll returns empty ALS cannot provide predictions when either user or item are missing from the training set.

It means that each split during cross validation will have undefined predictions and overall evaluation will be undefined. Because of that CrossValidator will return the first possible model because all models you train are equally bad from its perspective.

Upvotes: 2

Christian Hirsch
Christian Hirsch

Reputation: 2056

In your CrossValidator, you fix the number of folds to be 1. However, the parameter numFolds must be >=2. Using only one fold defeats with the idea of separation into train and test set.

Upvotes: 1

Related Questions