Reputation: 2317
I have one large table JavaPairRDD<String, MySchema> RDD1
, and a smaller JavaPairRDD<String, Double> RDD2
. I want to join these two RDDs, I know the best way is to make RDD2
a broadcast variable and then join to reduce shuffling. How to handle the broadcasting part? I mean after broadcasting, I will get a variable (A List, or Set), which is no more an RDD. How to join a broadcast variable with a RDD?
// I ignored the parsing part, just simplified it as loading from the files.
JavaPairRDD<String, MySchema> RDD1 = sc.textFile ("path_to_small_dataset");
JavaPairRDD<String, Double> RDD2 = sc.textFile("path_to_large_dataset");
// Broadcast RDD2
Set<Tuple2<String, Double>> set2 = new HashSet<>();
set2.addAll(RDD2.collect());
// now I have set2 and RDD1, how can I join them?
Upvotes: 1
Views: 2811
Reputation: 1590
Let's say you have two RDDs which you want to join, the first one is small enough to fit into the memory of each worker (smallRDD), and the second one does not need to be shuffled at all (largeRDD).
Before joining, you have to Make sure to transform the large RDD[T] to an RDD[(key, T)]. The key represents the columns used during the join operation.
This code should do the trick in Scala (but the basic principle is the same in Java)
val smallLookup = sc.broadcast(smallRDD.collect.toMap)
largeRDD.flatMap { case(key, value) =>
smallLookup.value.get(key).map { otherValue =>
(key, (value, otherValue))
}
}
I hope it helps
Upvotes: 3