Reputation: 29
I am using Spark + Scala. My rdd1 has customer info i.e. (id
, [name, address]
). rdd2 has only names of high profile customers. Now I want to find if customer in rdd1 is high profile or not. How can I search one rdd using another? Joining rdd's is not looking like a good solution for me.
My code:
val result = rdd1.map( case (id, customer) =>
customer.foreach ( c =>
rdd2.filter(_ == c._1).count()!=0 ))
Error:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations
;
Upvotes: 1
Views: 2610
Reputation: 523
You can use the left outer join, to avoid an expensive operation such as the collect (if your RDDs are big)
Also like Daniel pointed out, a broadcast is not necessary.
Here is a snippet that can help to obtain RDD1 with a flag which denotes he is a high profile customer or a low profile customer.
val highProfileFlag = 1
val lowProfileFlag = 0
// Keying rdd 1 by the name
val rdd1Keyed = rdd1.map { case (id, (name, address)) => (name, (id, address)) }
// Keying rdd 2 by the name and adding a high profile flag
val rdd2Keyed = rdd2.map { case name => (name, highProfileFlag) }
// The join you are looking for is the left outer join
val rdd1HighProfileFlag = rdd1Keyed
.leftOuterJoin(rdd2Keyed)
.map { case (name, (id, address), highProfileString) =>
val profileFlag = highProfileString.getOrElse(lowProfileFlag)
(id , (name, address, profileFlag))
}
Upvotes: 0
Reputation: 1595
You have to broadcast one rdd by collecting it. You can broadcast the smaller rdd to improve performance.
val bcastRdd = sc.broadcast(rdd2.collect)
rdd1.map(
case (id, customer) => customer.foreach(c =>
bcastRdd.value.filter(_ == c._1).count()!=0))
Upvotes: 3