mathieu
mathieu

Reputation: 2438

custom join with non equal keys

I need to implement a custom join strategy, that would match for non strictly equal keys. To illustrate, one can think about distance : the join should occur when the keys are close enough (although in my case, it s a bit more complicated than just a distance metric)

So I can't implement this by overriding equals, since there's no equality (and I need to keep a true equality test for other needs). And I suppose i also need to implement a proper partitioner.

How could I do that ?

Upvotes: 6

Views: 4884

Answers (2)

David Griffin
David Griffin

Reputation: 13927

Convert the RDDs to DataFrames, then you can do a join like this:

val newDF = leftDF.join(rightDF, $"col1" < ceilingVal and $"col1" > floorVal)

You can then define UDFs that you can use in your join. So if you had a "distanceUDF" like this:

val distanceUDF = udf[Int, Int, Int]((val1, val2) => val2 - val1)

You could then do:

val newDF = leftDF.join(rightDF, distanceUDF($"left.colX", $"right.colY") < 10)

Upvotes: 6

maasg
maasg

Reputation: 37435

One way would be to take the cartesian product of the two RDDs and filter out the elements that do not meet the condition. Using the distance example:

rdd1.cartesian(rdd2).filter{case (elem1, elem2) => distance(elem1,elem2) < threshold}

This is an expensive operation that could be optimized if one dataset is small enough, in which case, it could be broadcasted to perform a map-side join.

Upvotes: 0

Related Questions