user3803714
user3803714

Reputation: 5389

Accessing another rdd within a map

Here is my sample data:

| rdd1  |
| ....  |
| 10    |
| 200   |
| 350   |
| 400   |
| 1000  |
| 1500  |
| ..... |



| rdd2  |
| label | features                 | 
| ....  | .......................  |
|   0   | 1 10 30 100  200 450 600 |
|   0   | 200 300 400              |   
|   1   | 200 350 450              |
|   1   | 400 600 700              |
|  .... | ........................ |

I want to compute the following: For each element of rdd1 find out how many times it appears in the features in rdd2 for each label value. I need a tuple like this (#of times appears with lable 0, # times appears with label 1) So in the above example, 10 appears 1 time with label 0 and 0 times with label 1 for 10 it will be (1,0). 200 appears 2 times with label 0 and one time with label 1 so it will be (2,1) for 200.

In addition, I also want to find out For each element of rdd1 find out how many times it does not appear in the features in rdd2 for each label value. I need a tuple like this (#of times does not appear with lable 0, # times does not appear with label 1). So in the above example, for 10 I should get back it does not appear one time with label and two times with label 1 (1,2).

I was planning to use aggregate by key.

val initialCount : collection.mutable.ListBuffer[Int] = ListBuffer(0, 0)
val addToCounts = (s: collection.mutable.ListBuffer[Int], label:Int) => if (label == 1) s(0) += 1 else s(1) += 1
val sumPartitionCounts = (p1: collection.mutable.ListBuffer[Int], p2: collection.mutable.ListBuffer[Int]) => ListBuffer((p1(0) + p2(0)),(p1(1) + p2(1)))

However, I was reading that accessing an rdd within a map function of another rdd is not allowed. Any thoughts on how I can resolve this will be great.

Upvotes: 3

Views: 1556

Answers (1)

gneets
gneets

Reputation: 19

  1. Broadcast Variable - If your rdd2 is small enough, broadcast it to every node and use it as lookup within rdd1.map OR
  2. Join - Join the key-value rdds

You would have to restructure your rdd2 to get the desired key for broadcast var lookup or join. If rdd2 is RDD[label, Array(feature)], I would try to get an RDD[feature,label] like so:

    val rdd2Mapped: RDD[String,String] = rdd2.flatMap(x => x._2.map(y => (y,x._1)))        

Then create RDD[feature, Map[label, frequency]] using aggregateByKey

    val initialMap = scala.collection.mutable.Map.empty[String, Int]
    val addToMap = (x: scala.collection.mutable.Map[String, Int], y: String) => {
        if(x.contains(y))
            x += ((y, x.get(y).get+1))
        else
            x += ((y, 1))
        }
    val mergeMaps = (x: scala.collection.mutable.Map[String, Int], y: scala.collection.mutable.Map[String, Int]) => {
        x ++= y
    }
    val rdd2Aggregated: RDD[String, scala.collection.mutable.Map[String,Int] = 
      rdd2Mapped.aggregateByKey(initialMap)(addToMap, mergeMaps)

Now, either broadcast rdd2Aggregated or join rdd1 with rdd2Aggregated and use the Map[label->frequency] to get your desired result.

For second part of question, transform rdd2 in almost a similar way but take only distinct features for every label

    val rdd2Mapped: RDD[String,String] = rdd2.flatMap(x => x._2.distinct.map(y => (y,x._1)))

Get the RDD[feature, Map[label, frequency]] like in first part. This will give you the number of times a feature appears in rdd2. Now, get the no. of rows for each label from rdd2 (simple wordcount on labels in rdd2). You join rdd1 with this new rdd2Aggregated like before and further join the resultant rdd with the wordcount lookup map (or broadcast the wordcount lookup map if small enough). Now, for every feature you get a map of labels and the frequency. Subtract the frequency of every label from the corresponding count of label from lookup map to get the desired answer.

If a label doesn't exist in the Map[label,frequency] for a given feature, consider that frequency to be 0. Make sure to consider this edge case.

Upvotes: 0

Related Questions