santosh aditham
santosh aditham

Reputation: 29

search rdd for value from another rdd

I am using Spark + Scala. My rdd1 has customer info i.e. (id, [name, address]). rdd2 has only names of high profile customers. Now I want to find if customer in rdd1 is high profile or not. How can I search one rdd using another? Joining rdd's is not looking like a good solution for me.

My code:

val result = rdd1.map( case (id, customer) => 
  customer.foreach ( c => 
    rdd2.filter(_ == c._1).count()!=0 ))

Error: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;

Upvotes: 1

Views: 2610

Answers (2)

Vedant
Vedant

Reputation: 523

You can use the left outer join, to avoid an expensive operation such as the collect (if your RDDs are big)

Also like Daniel pointed out, a broadcast is not necessary.

Here is a snippet that can help to obtain RDD1 with a flag which denotes he is a high profile customer or a low profile customer.

val highProfileFlag = 1
val lowProfileFlag = 0 

// Keying rdd 1 by the name    
val rdd1Keyed = rdd1.map { case (id, (name, address)) => (name, (id, address)) }

// Keying rdd 2 by the name and adding a high profile flag
val rdd2Keyed = rdd2.map { case name => (name, highProfileFlag) }

// The join you are looking for is the left outer join
val rdd1HighProfileFlag = rdd1Keyed
.leftOuterJoin(rdd2Keyed)
.map { case (name, (id, address), highProfileString) => 
      val profileFlag = highProfileString.getOrElse(lowProfileFlag) 
      (id , (name, address, profileFlag))
}

Upvotes: 0

Hafiz Mujadid
Hafiz Mujadid

Reputation: 1595

You have to broadcast one rdd by collecting it. You can broadcast the smaller rdd to improve performance.

val bcastRdd = sc.broadcast(rdd2.collect)
rdd1.map(
   case (id, customer) => customer.foreach(c => 
        bcastRdd.value.filter(_ == c._1).count()!=0))

Upvotes: 3

Related Questions