user4046073
user4046073

Reputation: 871

How to find intersection of dataframes based on multiple columns?

I have two dataframes as below. I'm trying to find the intersection of two dataframes based on either of the two columns, not only both of them.

So In this case, I want to return dataframe C, which has df A row 1 (as A row1 col1= row one col1 in B), df A row 2(A row 2 Col 2=row 1 Col2 in B) and df A row 4(as Col1 row 2 in B = Col 1 row 4 in A), and row 5 in A. But if I do a intersect of A and B, it will only return row 5 in A, as that's a match of both columns. How do I do this? Many thanks.Let me know if I'm not explaining the question very well.

A:

     Col1    Col2 
     1         2    
     2         3
     3         7 
     5         4
     1         3   

B:

    Col1    Col2 
     1         3    
     5         1

C:

          1         2    
          2         3
          5         4
          1         3    

Upvotes: 0

Views: 5151

Answers (2)

Oli
Oli

Reputation: 10406

With the following data:

val df1 = sc.parallelize(Seq(1->2, 2->3, 3->7, 5->4, 1->3)).toDF("col1", "col2")
val df2 = sc.parallelize(Seq(1->3, 5->1)).toDF("col1", "col2")

Then you can join your datasets with a or condition:

val cols = df1.columns
df1.join(df2, cols.map(c => df1(c) === df2(c)).reduce(_ || _) )
   .select(cols.map(df1(_)) :_*)
   .distinct
   .show

+----+----+
|col1|col2|
+----+----+
|   2|   3|
|   1|   2|
|   1|   3|
|   5|   4|
+----+----+

The join condition is generic and would work for any number of columns. The code maps each column to an equality between that column in df1 and the same one in df2 cols.map(c => df1(c) === df2(c)). The the reduce takes the logical or of all these equalities, which is what you want. The select is there because otherwise the columns of both dataframes would be kept. Here I simply keep the ones from df1. I also added a distinct in case several lines of df2 would match a line of df1 or vice versa. Indeed, you may get a cartesian product.

Note that this method does not need any collection to the driver so it will work regardless of the size of the datasets. Yet, if df2 is small enough to be collected to the driver and braodcasted, you would get faster results with a method like this:

// to each column name, we map the set of values in df2.
val valueMap = df2.rdd
    .flatMap(row => cols.map(name => name -> row.getAs[Any](name)))
    .distinct
    .groupByKey
    .mapValues(_.toSet)
    .collectAsMap

//we create a udf that looks up in valueMap
val filter = udf((name : String, value : Any) => 
                     valueMap(name).contains(value))

//Finally we apply the filter.
df1.where( cols.map(c => filter(lit(c), df1(c))).reduce(_||_))
   .show

With this method, no shuffling of df1 and no cartesian product. If df2 is small, this is definitely the way to go.

Upvotes: 1

suj1th
suj1th

Reputation: 1801

You should perform two join operations individually on each of the join columns, and then perform a union of the two resulting Dataframes:

val dfA = List((1,2),(2,3),(3,7),(5,4),(1,3)).toDF("Col1", "Col2")
val dfB = List((1,3),(5,1)).toDF("Col1", "Col2")
val res1 = dfA.join(dfB, dfA.col("Col1")===dfB.col("Col1"))
val res2 = dfA.join(dfB, dfA.col("Col2")===dfB.col("Col2"))
val res = res1.union(res2)

Upvotes: 0

Related Questions