Reputation: 1782
I have an RDD
that needs to access data from another RDD
. However, I always get a Task not Serializable
error. I have extended the Serializable
Class however it has not worked. The code is:
val oldError = rddOfRatings.aggregate(0.0)((accum, rating) =>
accum + calcError(rating.rating,
us.lookup(rating.user)(0),
it.lookup(rating.product)(0)).abs, _+_ ) / rddSize
Where us
, it
and rddOfRatings
are the other RDD
's. What I don't understand is, that if an RDD
is immutable, then why won't it let me allow access to an RDD
from within another RDD
? The problem seems to lie with us
and it
as when I remove them for a local collection it works fine.
Thank you.
Upvotes: 2
Views: 1607
Reputation: 37435
rdd.lookup
1 is an expensive operation that you probably don't want to do even if you could.
Also, "serializing" an RDD does not make sense, as an RDD is just a reference to data and not data in itself.
The approach to take here will probably depend on the size of those datasets. If us
and it
RDDs are about the same size as rddOfRatings
(which is what it looks like, given the intended lookups), the best way would be to join those beforehand.
// please note I'm not aware of the actual structure of your collections, so take this as an illustrative example
val ratingErrorByUser = us.map(u => (u.id, u.error))
val ratingErrorByProduct = it.map(i=> (i.id, i.error))
val ratingsBykey = rddOfRatings.map(r=> (r.user, (r.product, r.rating)))
val ratingsWithUserError = ratingsByKey.join(ratingErrorByUser)
val ratingsWithProductError = ratingsWithUserError.map{case (userId, ((prodId, rating),userErr))} => (prodId,(rating, userErr))}
val allErrors = ratingsWithProductError.join(ratingErrorByProduct)
val totalErr = allErrors.map{case (prodId,((rating, userErr),prodErr)) => calcError(userErr, math.abs(prodErr), rating)}.reduce(_+_)
val total = totalErr / rddOfRatings.count
This is probably a lot easier with Spark DataFrame API
1 if lookup is a must (doesn't look like in this case!), have a look at Spark IndexedRdd
Upvotes: 2
Reputation: 1431
RDD's are indeed unserializable due to the variables they have to capture (SparkContext for example). To get around this problem, join the three RDDs together, and you will have all the necessary values in your accumulator closure.
Upvotes: 3