JoshBaryia
JoshBaryia

Reputation: 23

Checking if an RDD element is in another using the map function

I'm new to Spark and was wondering about closures.
I have two RDDs, one containing a list of IDs and a values, and the other containing a list of selected IDs.
Using a map, I want to increase the value of the element, if the other RDD contains its ID, like so.

val ids = sc.parallelize(List(1,2,10,5))
val vals = sc.parallelize(List((1, 0), (2, 0), (3,0), (4,0)))
vals.map( v => {
  if(ids.collect().contains(v._1)){
    (v._1, 1)
  } 
 })

However the job hangs and never completes. What is the proper way to do this, Thanks for your help!

Upvotes: 0

Views: 4186

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37842

Your implementation tries to use one RDD (ids) inside a closure used to map another - this isn't allowed in Spark applications: anything to be used in a closure must be serializable (and preferably small), since it will be serialized and sent to each worker.

a leftOuterJoin between these RDDs should get you what you want:

val ids = sc.parallelize(List(1,2,10,5))
val vals = sc.parallelize(List((1, 0), (2, 0), (3,0), (4,0)))
val result = vals
        .leftOuterJoin(ids.keyBy(i => i))
        .mapValues({ 
            case (v, Some(matchingId)) => v + 1  // increase value if match found
            case (v, None) => v                  // leave value as-is otherwise
        }) 

The leftOuterJoin expects two key-value RDDs, hence we artificially extract a key from the ids RDD using the identity function. Then we map the values of each resulting (id: Int, (value: Int, matchingId: Option[Int])) record into either v or v+1.

Generally, you should always aim to minimize the use of actions like collect when using Spark, as such actions move data back from the distributed cluster into your driver application.

Upvotes: 5

Related Questions