Reputation: 33
The use case is to find the n max rows in a given column (these can be n number of columns) and once you have the n keys you join it back to the original dataset to get all the rows you need
val df = Seq(("12", "Tom", "Hanks"), ("13", "Meryl", "Streep"), ("12", "Tom", "Hardy"), ("12", "John", "Streep") ).toDF("age", "firstname", "lastname")
Lets say i want to join each column individually with a larger actors dataset which has all the three columns above.
val v1 = actors.join(df, Seq("id"), "inner")
val v2 =actors.join(df, Seq("firstname"), "inner")
val v3 =actors.join(df, Seq("lastname"), "inner")
val output = v1.union(v2).union(v3)
Is there any way to not to do this iteratively ? Also because the columns to be joined against can be dynamic. for eg sometimes it can only be id, or only be id and firstname.
Upvotes: 1
Views: 1919
Reputation: 7316
@chlebek solution should work perfectly fine, this is one more approach in the case that you want to reproduce your initial logic:
val cols = Seq("id", "firstname", "lastname")
val final_df = cols.map{
df.join(actors, Seq(_), "inner")
}
.reduce(_ union _)
First we generate one inner join per column then we union them.
Upvotes: 0
Reputation: 2451
You can try different approach, so you can achieve it this way:
actors.join(df).where(
actors("id") === df("id") ||
actors("firstname") === df("firstname") ||
actors("lastname") === df("lastname")
)
and for n-columns you can try it:
val joinCols = Seq("id", "firstname", "lastname") // or actors.columns
val condition = joinCols
.map(s => (actors(s) === df(s)))
.reduce((a, b) => a || b)
you will get below condition:
condition.explain(true)
(((a#7 = a#7) || (b#8 = b#8)) || (c#9 = c#9))
and finally use it:
actors.join(df).where(condition)
Upvotes: 2
Reputation: 33
I think doing a broadcast with the smaller dataset and an udf to check with the bigger dataset would solve the issue. i kept thinking in terms of joins!
Upvotes: 0