Patssay
Patssay

Reputation: 33

Is there an alternative to do iterative join in spark - scala

The use case is to find the n max rows in a given column (these can be n number of columns) and once you have the n keys you join it back to the original dataset to get all the rows you need

val df = Seq(("12", "Tom", "Hanks"), ("13", "Meryl", "Streep"), ("12", "Tom", "Hardy"), ("12", "John", "Streep") ).toDF("age", "firstname", "lastname")

Lets say i want to join each column individually with a larger actors dataset which has all the three columns above.

val v1 = actors.join(df, Seq("id"), "inner")
val v2 =actors.join(df, Seq("firstname"), "inner")
val v3 =actors.join(df, Seq("lastname"), "inner")
val output = v1.union(v2).union(v3)

Is there any way to not to do this iteratively ? Also because the columns to be joined against can be dynamic. for eg sometimes it can only be id, or only be id and firstname.

Upvotes: 1

Views: 1919

Answers (3)

abiratsis
abiratsis

Reputation: 7316

@chlebek solution should work perfectly fine, this is one more approach in the case that you want to reproduce your initial logic:

val cols = Seq("id", "firstname", "lastname")

val final_df = cols.map{
     df.join(actors, Seq(_), "inner") 
}
.reduce(_ union _)

First we generate one inner join per column then we union them.

Upvotes: 0

chlebek
chlebek

Reputation: 2451

You can try different approach, so you can achieve it this way:

actors.join(df).where(
actors("id") === df("id") || 
actors("firstname") === df("firstname") || 
actors("lastname") === df("lastname")
)

and for n-columns you can try it:

  val joinCols = Seq("id", "firstname", "lastname") // or actors.columns
  val condition = joinCols
    .map(s => (actors(s) === df(s)))
    .reduce((a, b) => a || b)

you will get below condition:

condition.explain(true)
(((a#7 = a#7) || (b#8 = b#8)) || (c#9 = c#9))

and finally use it:

   actors.join(df).where(condition)

Upvotes: 2

Patssay
Patssay

Reputation: 33

I think doing a broadcast with the smaller dataset and an udf to check with the bigger dataset would solve the issue. i kept thinking in terms of joins!

Upvotes: 0

Related Questions