Reputation: 2036
Joining two RDDs is simple with a RDD1.join(RDD2)
. However, if I keep an arbitrary number of RDDs in a List<JavaRDD>
, how can I efficiently join them ?
Upvotes: 4
Views: 788
Reputation: 3990
First, please note that you cannot join JavaRDD
. You would need to obtain a JavaPairRDD
by using:
groupBy()
(or keyBy()
)cartesian()
[flat]mapToPair()
zipWithIndex()
(useful because it adds index where there is none)Then, once you have your list, you can join them all like this:
JavaPairRDD<Integer, String> linesA = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, "a1"),
new Tuple2<>(2, "a2"),
new Tuple2<>(3, "a3"),
new Tuple2<>(4, "a4")));
JavaPairRDD<Integer, String> linesB = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, "b1"),
new Tuple2<>(5, "b5"),
new Tuple2<>(3, "b3")));
JavaPairRDD<Integer, String> linesC = sc.parallelizePairs(Arrays.asList(
new Tuple2<>(1, "c1"),
new Tuple2<>(5, "c6"),
new Tuple2<>(6, "c3")));
// the list of RDDs
List<JavaPairRDD<Integer, String>> allLines = Arrays.asList(linesA, linesB, linesC);
// since we probably don't want to modify any of the datasets in the list, we will
// copy the first one in a separate variable to keep the result
JavaPairRDD<Integer, String> res = allLines.get(0);
for (int i = 1; i < allLines.size(); ++i) { // note we skip position 0 !
res = res.join(allLines.get(i))
/*[1]*/ .mapValues(tuple -> tuple._1 + ':' + tuple._2);
}
The line with [1]
is the important one, because it maps a
JavaPairRDD<Integer, Tuple2<String,String>>
back into a
JavaPairRdd<Integer,String>
which makes it compatible with further joins.
Based on chrisw answer, this could be put into "one line" like this:
JavaPairRDD<Integer, String> res;
res = allLines.stream()
.reduce((rdd1, rdd2) -> rdd1.join(rdd2).mapValues(tup -> tup._1 + ':' + tup._2))
.get(); // get value from Optional<JavaPairRDD>
Finally, some thoughts on performance. In the above example, I used string concatenation to reduce the result of the join back to an RDD of the same type. If you have a lot of RDDs, you could probably speed this up a bit by using the for loop
version with JavaPairRDD<Integer, StringBuilder> res
, where you do the first join by hand. I will post more details if required.
Upvotes: 4
Reputation: 3508
I'm not familiar with the JavaRDD class/interface but perhaps you could solve this problem using the higher-order function reduce
in Java 8, see https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html
final List<JavaRDD> list = getList(); // where getList is your list implementation containing JavaRDD instances
// The JavaRDD class provides rdd() to get the RDD
final JavaRDD rdd = list.stream().map(JavaRDD::rdd).reduce(RDD::join);
An example with the String class would be something like: -
Stream.of("foo", "bar", "baz").reduce(String::concat);
Which produces
foobarbaz
Upvotes: 1