Reputation: 57
I am currently working with an API environment that requires the usage of pyspark. In this way, I need to execute a daily comparison between two dataframes in order to determine with records are new, updated and deleted.
Here is an example of two dataframes:
today = spark.createDataFrame([
[1, "Apple", 5000, "A"],
[2, "Banana", 4000, "A"],
[3, "Orange", 3000, "B"],
[4, "Grape", 4500, "C"],
[5, "Watermelon", 2000, "A"]
], ["item_id", "name", "cost", "classification"])
yesterday = spark.createDataFrame([
[1, "Apple", 5000, "B"],
[2, "Bananas", 4000, "A"],
[3, "Tangerine", 3000, "B"],
[4, "Grape", 4500, "C"]
], ["item_id", "name", "cost", "classification"])
I want to compare both dataframes and determine what is new and what is updated. For the new items, I get it quite easy:
today.join(yesterday, on="item_id", how="left_anti").show()
# +---------+------------+------+----------------+
# | item_id | name | cost | classification |
# +---------+------------+------+----------------+
# | 5 | Watermelon | 2000 | A |
# +---------+------------+------+----------------+
However for the items that got updated, I have no idea how to compare those results. I need to get all the rows that have different values for the remaining columns of the dataframe. My expected result, in the case above is:
# +---------+------------+------+----------------+
# | item_id | name | cost | classification |
# +---------+------------+------+----------------+
# | 1 | Apple | 5000 | A |
# | 2 | Banana | 4000 | A |
# | 3 | Orange | 3000 | B |
# +---------+------------+------+----------------+
Upvotes: 0
Views: 175
Reputation: 2939
Use .subtract()
method to get today
's rows that not present in yesterday
, then left-semi join with yesterday
today.subtract(yesterday).join(yesterday, on="item_id", how="left_semi").show()
# +-------+------+----+--------------+
# |item_id| name|cost|classification|
# +-------+------+----+--------------+
# | 1| Apple|5000| A|
# | 3|Orange|3000| B|
# | 2|Banana|4000| A|
# +-------+------+----+--------------+
Upvotes: 0
Reputation: 54
joined = today.join(yesterday, [today.item_id== yesterday.item_id] , how = 'inner' )
Then apply the filter for which the classification are not matching
fitlered = joined.filter(joined.today_classification != joined.yesterday_classification)
filtered
dataframe is your required
Upvotes: -1