Reputation: 591
I have encountered an issue where in I have to get uncommon rows from a dataframe based on a column of another dataframe. Example being First dataframe i.e df1
_id | name |
---|---|
12 | abc |
56 | def |
90 | jkl |
reference Dataframe i.e df2 :-
_id | user_name | event_name |
---|---|---|
12 | abc | some |
34 | xyz | left |
56 | def | right |
78 | ghi | middle |
90 | jkl | front |
The third dataframe is expected to have
_id | user_name | event_name |
---|---|---|
34 | xyz | left |
78 | ghi | middle |
To achieve this I have tried two approach :-
Approach 1 Getting the list of id's from df2 and using isin to filter the columns using below
val idsList = df1.select("_id").dropDuplicates().map(_.getInt(0)).collect.toList
val dffDf = df2.filter(not($"_id".isin(idsList: _*)))
The problem with this approach is the collect function is too heavy and it takes lot of time as all calculation happens at driver level.
Approach 2 Using left anti join
df2.join(df1, df2("_id") === df1("_id"),"leftanti")
But this is giving me below issue
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations (100) reached for batch Resolution, please set 'spark.sql.analyzer.maxIterations' to a larger value
The issue is mentioned in https://issues.apache.org/jira/browse/SPARK-37222 without any solution.
Is there any other recommendation for achieving this ?? Any pointer will be great. I would need a solution which is effective as well as is not heavy or time consuming. Using spark 3.3.0 version.
Upvotes: 0
Views: 63
Reputation: 150
Here are two of the many ways you can solve this -
Using left_anti
import spark.implicits._
val input1 = Seq(
(12, "abc"),
(56, "def"),
(90, "jkl")
).toDF("_id", "name")
val input2 = Seq(
(12, "abc", "some"),
(34, "xyz", "left"),
(56, "def", "right"),
(78, "ghi", "middle"),
(90, "jkl", "front")
).toDF("_id", "username", "event_name")
println("spark.version : " + spark.version)
val finalDf = input2.join(input1, input2("_id") === input1("_id"), "left_anti")
println("result : ")
finalDf.show
/*
spark.version : 3.3.0
result :
+---+--------+----------+
|_id|username|event_name|
+---+--------+----------+
| 34| xyz| left|
| 78| ghi| middle|
+---+--------+----------+
*/
Using left_outer
import spark.implicits._
val input1 = Seq(
(12, "abc"),
(56, "def"),
(90, "jkl")
).toDF("_id", "name")
val input2 = Seq(
(12, "abc", "some"),
(34, "xyz", "left"),
(56, "def", "right"),
(78, "ghi", "middle"),
(90, "jkl", "front")
).toDF("_id", "username", "event_name")
println("spark.version : " + spark.version)
val finalDf = input2.as("input2").join(input1.as("input1"), input2("_id") === input1("_id"), "left_outer").where(input1("_id").isNull)
println("result : ")
finalDf.select("input2.*").show
/*
spark.version : 3.3.0
result :
+---+--------+----------+
|_id|username|event_name|
+---+--------+----------+
| 34| xyz| left|
| 78| ghi| middle|
+---+--------+----------+
*/
Upvotes: 0