Heber Brandao
Heber Brandao

Reputation: 57

Compare two different pyspark dataframes

I am currently working with an API environment that requires the usage of pyspark. In this way, I need to execute a daily comparison between two dataframes in order to determine with records are new, updated and deleted.

Here is an example of two dataframes:

today = spark.createDataFrame([
  [1, "Apple", 5000, "A"],
  [2, "Banana", 4000, "A"],
  [3, "Orange", 3000, "B"],
  [4, "Grape", 4500, "C"],
  [5, "Watermelon", 2000, "A"]
], ["item_id", "name", "cost", "classification"])

yesterday = spark.createDataFrame([
  [1, "Apple", 5000, "B"],
  [2, "Bananas", 4000, "A"],
  [3, "Tangerine", 3000, "B"],
  [4, "Grape", 4500, "C"]
], ["item_id", "name", "cost", "classification"])

I want to compare both dataframes and determine what is new and what is updated. For the new items, I get it quite easy:

today.join(yesterday, on="item_id", how="left_anti").show()

# +---------+------------+------+----------------+
# | item_id |    name    | cost | classification |
# +---------+------------+------+----------------+
# |    5    | Watermelon | 2000 |       A        |
# +---------+------------+------+----------------+

However for the items that got updated, I have no idea how to compare those results. I need to get all the rows that have different values for the remaining columns of the dataframe. My expected result, in the case above is:

# +---------+------------+------+----------------+
# | item_id |    name    | cost | classification |
# +---------+------------+------+----------------+
# |    1    |    Apple   | 5000 |       A        |
# |    2    |   Banana   | 4000 |       A        |
# |    3    |   Orange   | 3000 |       B        |
# +---------+------------+------+----------------+

Upvotes: 0

Views: 175

Answers (2)

AdibP
AdibP

Reputation: 2939

Use .subtract() method to get today's rows that not present in yesterday, then left-semi join with yesterday

today.subtract(yesterday).join(yesterday, on="item_id", how="left_semi").show()

# +-------+------+----+--------------+
# |item_id|  name|cost|classification|
# +-------+------+----+--------------+
# |      1| Apple|5000|             A|
# |      3|Orange|3000|             B|
# |      2|Banana|4000|             A|
# +-------+------+----+--------------+

Upvotes: 0

datawiz
datawiz

Reputation: 54

joined = today.join(yesterday, [today.item_id== yesterday.item_id] , how = 'inner' )

Then apply the filter for which the classification are not matching

fitlered = joined.filter(joined.today_classification != joined.yesterday_classification)

filtered dataframe is your required

Upvotes: -1

Related Questions