Sam
Sam

Reputation: 61

For-loop in two RDD list for similar key

I have two rdds in the form of RDD[(String, Array[(String, Array[String])])]. I have data in them like:

rdd1 = (4, [(0, [1,4,5,6]), (2, [4,5,6])])

(5, [(0, [1,4,5,6]), (2, [4,5,6])]) ......

rdd2 be like = (4, [(0, [1,4,6])])

(5, [(1, [2,5,6]), (2, [3,5])])......

First I want to check if key of rdd1 is present in rdd2 as well then for the tuples inside their array I want to run a for loop with each tuple in the rdd1 with every tuples of that key in rdd2. For example both rdd1 and rdd2 contains key as 4. So I want to run a for loop for that key 4 and its items should look like this (0, [1,4,5,6]) (0, [1,4,6]) and (2, [4,5,6]) (0, [1,4,6]). By iterating on these data i have to do some operations on that.

What I tried to do is combining these two rdds and then applying the for loop, but this will iterate over the tuples of same rdds as well.

val rdd3 = merged_both_rdd1_rdd2_by_key.flatMap(x=> {for(i <- 0 until x._2.size) {for(j <- i until x._2.size)} })

But this iterates over the tuples of same rdd as well. I only want to iterate each of the rdd1 tuples to each of them with rdd2.

I tried to do nested for loop for two rdds but its giving me some error.

    val sortedLines2 = sortedLines1.flatMap(y => {
                                              var myMap: Map[(String, String),Double] = Map()
                                              val second = sortedLines12.flatMap(x => { var myMap1: Map[(String, String),Double] = Map()
                                              for(i <- 0 until x._2.size)
                                              {
                                                for(j <- 0 until y._2.size)
                                                {
                                                  if(i != j)
                                                  {
                                                    val inter = (x._2(i)._2.toSet & y._2(j)._2.toSet).size.toDouble
                                                    val union = (x._2(i)._2.toSet.size + y._2(j)._2.toSet.size).toDouble - inter
                                                    val div = inter/union
                                                    if(div >= threshold)
                                                    { 
                                                      if(!myMap.contains((x._2(i)._1, y._2(j)._1)) )
                                                      {
                                                          myMap += ( (x._2(i)._1, y._2(j)._1) -> div )
                                                          myMap1 += ( (x._2(i)._1, x._2(j)._1) -> div )
                                                      }
                                                    }
                                                  }
                                                 } 
                                               }
                                               myMap1
                                              }
)
myMap
}
)

By doing this I am getting below error:

    This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

Upvotes: 0

Views: 1278

Answers (1)

LevG
LevG

Reputation: 177

You can try first to join the rdds by key:

rddsJoin = rdd1.join(rdd2)

and then loop over the join rdd values:

rddsJoin.foreach{case(key,(v1,v2)) => 
        {for(vE1<-v1;vE2<-v2) {doSomething(vE1,vE2)}}}

If you want to do a transformation (and not an operation), replace foreach with map or flatMap according to your application needs.

Upvotes: 1

Related Questions