Reputation: 695
In apache spark one can union multiple RDDs efficiently by using sparkContext.union()
method. Is there something similar if someone wants to intersect multiple RDDs? I have searched in sparkContext methods and I could not find anything or anywhere else. One solution could be to union the rdds and then retrieve the duplicates, but I do not think it could be that efficient. Assuming I have the following example with key/value pair collections:
val rdd1 = sc.parallelize(Seq((1,1.0),(2,1.0)))
val rdd2 = sc.parallelize(Seq((1,2.0),(3,4.0),(3,1.0)))
I want to retrieve a new collection which has the following elements:
(1,2.0) (1,1.0)
But of course for multiple rdds and not just two.
Upvotes: 1
Views: 1968
Reputation: 3725
There is an intersection method on RDD, but it only takes one other RDD:
def intersection(other: RDD[T]): RDD[T]
Let's implement the method you want in terms of this one.
def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = {
rdds.reduce { case (left, right) => left.intersection(right)
}
If you've looked at the implementation of Spark joins, you can optimize the execution by putting the largest RDD first:
def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = {
rdds.sortBy(rdd => -1 * rdd.partitions.length)
.reduce { case (left, right) => left.intersection(right)
}
EDIT: It looks like I misread your example: your text looked like you were searching for the inverse behavior to rdd.union, but your example implied you want intersect by key. My answer does not address this case.
Upvotes: 2
Reputation:
Try:
val rdds = Seq(
sc.parallelize(Seq(1, 3, 5)),
sc.parallelize(Seq(3, 5)),
sc.parallelize(Seq(1, 3))
)
rdds.map(rdd => rdd.map(x => (x, None))).reduce((x, y) => x.join(y).keys.map(x => (x, None))).keys
Upvotes: 2