Reputation: 2438
I need to implement a custom join strategy, that would match for non strictly equal keys. To illustrate, one can think about distance : the join should occur when the keys are close enough (although in my case, it s a bit more complicated than just a distance metric)
So I can't implement this by overriding equals, since there's no equality (and I need to keep a true equality test for other needs). And I suppose i also need to implement a proper partitioner.
How could I do that ?
Upvotes: 6
Views: 4884
Reputation: 13927
Convert the RDDs to DataFrames, then you can do a join like this:
val newDF = leftDF.join(rightDF, $"col1" < ceilingVal and $"col1" > floorVal)
You can then define UDFs that you can use in your join. So if you had a "distanceUDF" like this:
val distanceUDF = udf[Int, Int, Int]((val1, val2) => val2 - val1)
You could then do:
val newDF = leftDF.join(rightDF, distanceUDF($"left.colX", $"right.colY") < 10)
Upvotes: 6
Reputation: 37435
One way would be to take the cartesian product of the two RDDs and filter out the elements that do not meet the condition. Using the distance example:
rdd1.cartesian(rdd2).filter{case (elem1, elem2) => distance(elem1,elem2) < threshold}
This is an expensive operation that could be optimized if one dataset is small enough, in which case, it could be broadcasted to perform a map-side join.
Upvotes: 0