Reputation: 467
I am new to Spark/Scala. I have a master data frame which consists of over 100 million records
+--------+
| ttm_id|
+--------+
|39622109|
|39622178|
|39578322|
+--------+
And a changelist DataFrame which has around 40 million records
+----------+--------+
|__change__| ttm_id|
+----------+--------+
| DELETE|18001570|
| DELETE| 50520|
| DELETE| 144440|
| DELETE| 93130|
| DELETE| 93140|
+----------+--------+
How would I go about comparing these two data frames so that:
If __change__ = DELETE and masterlist.ttm_id = changeset.ttm_id then remove matching ttm_id record from the Masterlist
Thanks!
Upvotes: 1
Views: 353
Reputation: 41957
Broadcasting a smaller dataframe should help reduce the shuffle needed for joining the dataframes.
You can use join
, filter
and drop
after broadcasting the changedset
dataframe to get your desired result
val broadcastedMasterList = sc.broadcast(changeset)
masterlist.join(broadcastedMasterList.value, Seq("ttm_id"), "left")
.filter($"__change__".isNull || $"__change__" =!= "DELETE")
.drop("__change__")
.show(false)
I hope the answer is helpful.
Upvotes: 0
Reputation: 22439
I like @MaxU's solution using except
. Here's another approach using left_anti
join:
master.join( changelist.where($"__change__" === "DELETE"),
Seq("ttm_id"), "left_anti"
)
Note that for large DataFrames, this approach can be expensive.
Upvotes: 1
Reputation: 210832
IIUC you can do it using the following query:
select * from masterlist
where not exists (select 1 from changeset
where masterlist.ttm_id = changeset.ttm_id
and masterlist.__change__='DELETE');
Demo:
scala> m.show
+--------+
| ttm_id|
+--------+
|39622109|
|39622178|
|39578322|
+--------+
scala> c.show
+----------+--------+
|__change__| ttm_id|
+----------+--------+
| DELETE|39622109|
| DELETE| 50520|
+----------+--------+
scala> val q="""
| select * from masterlist
| where not exists (select ttm_id from changeset
| where masterlist.ttm_id = changeset.ttm_id
| and changeset.__change__='DELETE')
| """
q: String =
"
select * from masterlist
where not exists (select ttm_id from changeset
where masterlist.ttm_id = changeset.ttm_id
and changeset.__change__='DELETE')
"
scala> val res = spark.sql(q)
res: org.apache.spark.sql.DataFrame = [ttm_id: int]
scala> res.show
+--------+
| ttm_id|
+--------+
|39622178|
|39578322|
+--------+
Another solution:
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> m.withColumn("__change__", lit("DELETE")).except(c.select("ttm_id","__change__")).select("ttm_id").show
+--------+
| ttm_id|
+--------+
|39578322|
|39622178|
+--------+
Upvotes: 0