Ashit_Kumar
Ashit_Kumar

Reputation: 591

Derive Dataframe having uncommon rows based on a single column of another Dataframe

I have encountered an issue where in I have to get uncommon rows from a dataframe based on a column of another dataframe. Example being First dataframe i.e df1

_id name
12 abc
56 def
90 jkl

reference Dataframe i.e df2 :-

_id user_name event_name
12 abc some
34 xyz left
56 def right
78 ghi middle
90 jkl front

The third dataframe is expected to have

_id user_name event_name
34 xyz left
78 ghi middle

To achieve this I have tried two approach :-

Approach 1 Getting the list of id's from df2 and using isin to filter the columns using below

val idsList = df1.select("_id").dropDuplicates().map(_.getInt(0)).collect.toList
val dffDf = df2.filter(not($"_id".isin(idsList: _*)))

The problem with this approach is the collect function is too heavy and it takes lot of time as all calculation happens at driver level.

Approach 2 Using left anti join

 df2.join(df1, df2("_id") ===  df1("_id"),"leftanti")

But this is giving me below issue

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations (100) reached for batch Resolution, please set 'spark.sql.analyzer.maxIterations' to a larger value

The issue is mentioned in https://issues.apache.org/jira/browse/SPARK-37222 without any solution.

Is there any other recommendation for achieving this ?? Any pointer will be great. I would need a solution which is effective as well as is not heavy or time consuming. Using spark 3.3.0 version.

Upvotes: 0

Views: 63

Answers (1)

Himadri Pal
Himadri Pal

Reputation: 150

Here are two of the many ways you can solve this -

Using left_anti

    import spark.implicits._

val input1 = Seq(
  (12, "abc"),
  (56, "def"),
  (90, "jkl")
).toDF("_id", "name")

val input2 = Seq(
  (12, "abc", "some"),
  (34, "xyz", "left"),
  (56, "def", "right"),
  (78, "ghi", "middle"),
  (90, "jkl", "front")
).toDF("_id", "username", "event_name")
println("spark.version : " + spark.version)

val finalDf = input2.join(input1, input2("_id") === input1("_id"), "left_anti")

println("result : ")
finalDf.show

/*
spark.version : 3.3.0
result : 
+---+--------+----------+
|_id|username|event_name|
+---+--------+----------+
| 34|     xyz|      left|
| 78|     ghi|    middle|
+---+--------+----------+
 */

Using left_outer

    import spark.implicits._
val input1 = Seq(
  (12, "abc"),
  (56, "def"),
  (90, "jkl")
).toDF("_id", "name")

val input2 = Seq(
  (12, "abc", "some"),
  (34, "xyz", "left"),
  (56, "def", "right"),
  (78, "ghi", "middle"),
  (90, "jkl", "front")
).toDF("_id", "username", "event_name")

println("spark.version : " + spark.version)

val finalDf = input2.as("input2").join(input1.as("input1"), input2("_id") === input1("_id"), "left_outer").where(input1("_id").isNull)
println("result : ")
finalDf.select("input2.*").show


/*
spark.version : 3.3.0
result :
+---+--------+----------+
|_id|username|event_name|
+---+--------+----------+
| 34|     xyz|      left|
| 78|     ghi|    middle|
+---+--------+----------+

 */

Upvotes: 0

Related Questions