Reputation: 23
I'm new to Spark and was wondering about closures.
I have two RDDs, one containing a list of IDs and a values, and the other containing a list of selected IDs.
Using a map, I want to increase the value of the element, if the other RDD contains its ID, like so.
val ids = sc.parallelize(List(1,2,10,5))
val vals = sc.parallelize(List((1, 0), (2, 0), (3,0), (4,0)))
vals.map( v => {
if(ids.collect().contains(v._1)){
(v._1, 1)
}
})
However the job hangs and never completes. What is the proper way to do this, Thanks for your help!
Upvotes: 0
Views: 4186
Reputation: 37842
Your implementation tries to use one RDD (ids
) inside a closure used to map another - this isn't allowed in Spark applications: anything to be used in a closure must be serializable (and preferably small), since it will be serialized and sent to each worker.
a leftOuterJoin
between these RDDs should get you what you want:
val ids = sc.parallelize(List(1,2,10,5))
val vals = sc.parallelize(List((1, 0), (2, 0), (3,0), (4,0)))
val result = vals
.leftOuterJoin(ids.keyBy(i => i))
.mapValues({
case (v, Some(matchingId)) => v + 1 // increase value if match found
case (v, None) => v // leave value as-is otherwise
})
The leftOuterJoin
expects two key-value RDDs, hence we artificially extract a key from the ids
RDD using the identity function. Then we map the values of each resulting (id: Int, (value: Int, matchingId: Option[Int]))
record into either v or v+1.
Generally, you should always aim to minimize the use of actions like collect
when using Spark, as such actions move data back from the distributed cluster into your driver application.
Upvotes: 5