monster
monster

Reputation: 1782

RDD accessing values in another RDD

I have an RDD that needs to access data from another RDD. However, I always get a Task not Serializable error. I have extended the Serializable Class however it has not worked. The code is:

val oldError = rddOfRatings.aggregate(0.0)((accum, rating) =>
accum + calcError(rating.rating,
us.lookup(rating.user)(0),
it.lookup(rating.product)(0)).abs, _+_ ) / rddSize

Where us, it and rddOfRatings are the other RDD's. What I don't understand is, that if an RDD is immutable, then why won't it let me allow access to an RDD from within another RDD? The problem seems to lie with us and it as when I remove them for a local collection it works fine.

Thank you.

Upvotes: 2

Views: 1607

Answers (2)

maasg
maasg

Reputation: 37435

rdd.lookup1 is an expensive operation that you probably don't want to do even if you could.

Also, "serializing" an RDD does not make sense, as an RDD is just a reference to data and not data in itself.

The approach to take here will probably depend on the size of those datasets. If us and it RDDs are about the same size as rddOfRatings (which is what it looks like, given the intended lookups), the best way would be to join those beforehand.

// please note I'm not aware of the actual structure of your collections, so take this as an illustrative example

val ratingErrorByUser = us.map(u => (u.id, u.error))
val ratingErrorByProduct = it.map(i=> (i.id, i.error)) 
val ratingsBykey = rddOfRatings.map(r=> (r.user, (r.product, r.rating)))
val ratingsWithUserError = ratingsByKey.join(ratingErrorByUser)
val ratingsWithProductError = ratingsWithUserError.map{case (userId, ((prodId, rating),userErr))} => (prodId,(rating, userErr))}
val allErrors = ratingsWithProductError.join(ratingErrorByProduct)
val totalErr = allErrors.map{case (prodId,((rating, userErr),prodErr)) => calcError(userErr, math.abs(prodErr), rating)}.reduce(_+_)
val total = totalErr / rddOfRatings.count

This is probably a lot easier with Spark DataFrame API

1 if lookup is a must (doesn't look like in this case!), have a look at Spark IndexedRdd

Upvotes: 2

Dan Osipov
Dan Osipov

Reputation: 1431

RDD's are indeed unserializable due to the variables they have to capture (SparkContext for example). To get around this problem, join the three RDDs together, and you will have all the necessary values in your accumulator closure.

Upvotes: 3

Related Questions