Phil Baines
Phil Baines

Reputation: 467

Comparing Spark 2 DataFrame's

I am new to Spark/Scala. I have a master data frame which consists of over 100 million records

+--------+
|  ttm_id|
+--------+
|39622109|
|39622178|
|39578322|
+--------+

And a changelist DataFrame which has around 40 million records

+----------+--------+
|__change__|  ttm_id|
+----------+--------+
|    DELETE|18001570|
|    DELETE|   50520|
|    DELETE|  144440|
|    DELETE|   93130|
|    DELETE|   93140|
+----------+--------+

How would I go about comparing these two data frames so that:

If __change__ = DELETE and masterlist.ttm_id = changeset.ttm_id then remove matching ttm_id record from the Masterlist

Thanks!

Upvotes: 1

Views: 353

Answers (3)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Broadcasting a smaller dataframe should help reduce the shuffle needed for joining the dataframes.

You can use join, filter and drop after broadcasting the changedset dataframe to get your desired result

val broadcastedMasterList = sc.broadcast(changeset)
masterlist.join(broadcastedMasterList.value, Seq("ttm_id"), "left")
  .filter($"__change__".isNull || $"__change__" =!= "DELETE")
  .drop("__change__")
  .show(false)

I hope the answer is helpful.

Upvotes: 0

Leo C
Leo C

Reputation: 22439

I like @MaxU's solution using except. Here's another approach using left_anti join:

master.join( changelist.where($"__change__" === "DELETE"),
  Seq("ttm_id"), "left_anti"
)

Note that for large DataFrames, this approach can be expensive.

Upvotes: 1

MaxU - stand with Ukraine
MaxU - stand with Ukraine

Reputation: 210832

IIUC you can do it using the following query:

select * from masterlist
where not exists (select 1 from changeset
                  where masterlist.ttm_id = changeset.ttm_id
                    and masterlist.__change__='DELETE');

Demo:

scala> m.show
+--------+
|  ttm_id|
+--------+
|39622109|
|39622178|
|39578322|
+--------+


scala> c.show
+----------+--------+
|__change__|  ttm_id|
+----------+--------+
|    DELETE|39622109|
|    DELETE|   50520|
+----------+--------+


scala> val q="""
     | select * from masterlist
     | where not exists (select ttm_id from changeset
     |                   where masterlist.ttm_id = changeset.ttm_id
     |                     and changeset.__change__='DELETE')
     | """
q: String =
"
select * from masterlist
where not exists (select ttm_id from changeset
                  where masterlist.ttm_id = changeset.ttm_id
                    and changeset.__change__='DELETE')
"

scala> val res = spark.sql(q)
res: org.apache.spark.sql.DataFrame = [ttm_id: int]

scala> res.show
+--------+
|  ttm_id|
+--------+
|39622178|
|39578322|
+--------+

Another solution:

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> m.withColumn("__change__", lit("DELETE")).except(c.select("ttm_id","__change__")).select("ttm_id").show
+--------+
|  ttm_id|
+--------+
|39578322|
|39622178|
+--------+

Upvotes: 0

Related Questions