Al Jenssen
Al Jenssen

Reputation: 695

Apache Spark - Intersection of Multiple RDDs

In apache spark one can union multiple RDDs efficiently by using sparkContext.union() method. Is there something similar if someone wants to intersect multiple RDDs? I have searched in sparkContext methods and I could not find anything or anywhere else. One solution could be to union the rdds and then retrieve the duplicates, but I do not think it could be that efficient. Assuming I have the following example with key/value pair collections:

val rdd1 = sc.parallelize(Seq((1,1.0),(2,1.0)))
val rdd2 = sc.parallelize(Seq((1,2.0),(3,4.0),(3,1.0)))

I want to retrieve a new collection which has the following elements:

(1,2.0) (1,1.0)

But of course for multiple rdds and not just two.

Upvotes: 1

Views: 1968

Answers (2)

Tim
Tim

Reputation: 3725

There is an intersection method on RDD, but it only takes one other RDD:

def intersection(other: RDD[T]): RDD[T]

Let's implement the method you want in terms of this one.

def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = {
  rdds.reduce { case (left, right) => left.intersection(right) 
}

If you've looked at the implementation of Spark joins, you can optimize the execution by putting the largest RDD first:

def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = {
  rdds.sortBy(rdd => -1 * rdd.partitions.length)
    .reduce { case (left, right) => left.intersection(right) 
}

EDIT: It looks like I misread your example: your text looked like you were searching for the inverse behavior to rdd.union, but your example implied you want intersect by key. My answer does not address this case.

Upvotes: 2

user6022341
user6022341

Reputation:

Try:

val rdds = Seq(
  sc.parallelize(Seq(1, 3, 5)),
  sc.parallelize(Seq(3, 5)),
  sc.parallelize(Seq(1, 3))
)
rdds.map(rdd => rdd.map(x => (x, None))).reduce((x, y) => x.join(y).keys.map(x => (x, None))).keys

Upvotes: 2

Related Questions