Reputation: 61
I have two rdds in the form of RDD[(String, Array[(String, Array[String])])]
. I have data in them like:
rdd1 = (4, [(0, [1,4,5,6]), (2, [4,5,6])])
(5, [(0, [1,4,5,6]), (2, [4,5,6])]) ......
rdd2 be like = (4, [(0, [1,4,6])])
(5, [(1, [2,5,6]), (2, [3,5])])......
First I want to check if key of rdd1 is present in rdd2 as well then for the tuples inside their array I want to run a for loop with each tuple in the rdd1 with every tuples of that key in rdd2. For example both rdd1 and rdd2 contains key as 4. So I want to run a for loop for that key 4 and its items should look like this (0, [1,4,5,6]) (0, [1,4,6])
and (2, [4,5,6]) (0, [1,4,6])
. By iterating on these data i have to do some operations on that.
What I tried to do is combining these two rdds and then applying the for loop, but this will iterate over the tuples of same rdds as well.
val rdd3 = merged_both_rdd1_rdd2_by_key.flatMap(x=> {for(i <- 0 until x._2.size) {for(j <- i until x._2.size)} })
But this iterates over the tuples of same rdd as well. I only want to iterate each of the rdd1 tuples to each of them with rdd2.
I tried to do nested for loop for two rdds but its giving me some error.
val sortedLines2 = sortedLines1.flatMap(y => {
var myMap: Map[(String, String),Double] = Map()
val second = sortedLines12.flatMap(x => { var myMap1: Map[(String, String),Double] = Map()
for(i <- 0 until x._2.size)
{
for(j <- 0 until y._2.size)
{
if(i != j)
{
val inter = (x._2(i)._2.toSet & y._2(j)._2.toSet).size.toDouble
val union = (x._2(i)._2.toSet.size + y._2(j)._2.toSet.size).toDouble - inter
val div = inter/union
if(div >= threshold)
{
if(!myMap.contains((x._2(i)._1, y._2(j)._1)) )
{
myMap += ( (x._2(i)._1, y._2(j)._1) -> div )
myMap1 += ( (x._2(i)._1, x._2(j)._1) -> div )
}
}
}
}
}
myMap1
}
)
myMap
}
)
By doing this I am getting below error:
This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
Upvotes: 0
Views: 1278
Reputation: 177
You can try first to join the rdds by key:
rddsJoin = rdd1.join(rdd2)
and then loop over the join rdd values:
rddsJoin.foreach{case(key,(v1,v2)) =>
{for(vE1<-v1;vE2<-v2) {doSomething(vE1,vE2)}}}
If you want to do a transformation (and not an operation), replace foreach
with map
or flatMap
according to your application needs.
Upvotes: 1