Moona B
Moona B

Reputation: 1

PySpark join two RDD results in an empty RDD

I'm a Spark newbie trying to edit and apply this movie recommendation tutorial(https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html) on my dataset .But it keeps throwing This error :

ValueError: Can not reduce() empty RDD

This is the function that computes the Root Mean Squared Error of the model :

def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))

    print predictions.count()
    print predictions.first()
    print "predictions above"

    print data.count()
    print data.first()
    print "validation data above"


    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
#LINE56
   .join(data.map(lambda line: line.split(‘,’) ).map(lambda x: ((x[0], x[1]), x[2]))) \
  .values()    

   print predictionsAndRatings.count()
   print "predictions And Ratings above" 
#LINE63
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) **    2).reduce(add) / float(n))

model = ALS.train(training, rank, numIter, lambda). data is the validation data set. training and validation set originally from a ratings.txt file in the format of : userID,productID,rating,ratingopID

These are parts of the output :

879
...
Rating(user=0, product=656, rating=4.122132631144641)
predictions above
...
1164
...
(u'640085', u'1590', u'5')
validation data above    
...
16/08/26 12:47:18 INFO DAGScheduler: Registering RDD 259 (join at     /path/myapp/MyappALS.py:56)
16/08/26 12:47:18 INFO DAGScheduler: Got job 20 (count at         /path/myapp/MyappALS.py:59) with 12 output partitions 
16/08/26 12:47:18 INFO DAGScheduler: Final stage: ResultStage 238 (count at /path/myapp/MyappALS.py:59)
16/08/26 12:47:18 INFO DAGScheduler: Parents of final stage:     List(ShuffleMapStage 237)
16/08/26 12:47:18 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 237)
16/08/26 12:47:18 INFO DAGScheduler: Submitting ShuffleMapStage 237     (PairwiseRDD[259] at join at /path/myapp/MyappALS.py:56), which has no     missing parents    
....

0
predictions And Ratings above

...
Traceback (most recent call last):
File "/path/myapp/MyappALS.py", line 130, in <module>
validationRmse = computeRmse(model, validation, numValidation)
File "/path/myapp/MyappALS.py", line 63, in computeRmse
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
File "/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 805, in reduce
ValueError: Can not reduce() empty RDD

So from the count() i'm sure the initial RDD are not empty . Than the INFO log Registering RDD 259 (join at /path/myapp/MyappALS.py:56) does it mean that the join job is launched ?

Is there something wrong i'm missing ? Thank you .

Upvotes: 0

Views: 1347

Answers (1)

Moona B
Moona B

Reputation: 1

That error disappeared when i added int() to :

predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \ .join(data.map(lambda x: ((int(x[0]), int(x[1])), int(x[2])))) \ .values()

we think its because pediction is outputed from the method predictAll which gives tupple ,but the other data that was parsed manually by the algorithm

Upvotes: 0

Related Questions