wasim
wasim

Reputation: 21

Upsert/Merge two dataframe in pyspark

i need one help for the below requirement. this is just for sample data. i have more than 200 columns in each data frame in real time use case. i need to compare two data frames and flag the differences.

df1

id, name,  city
1,  abc,   pune
2,  xyz,   noida

df2

id,  name,  city
1,   abc,   pune
2,   xyz,   bangalore
3,   kk,    mumbai

expected dataframe

id, name, city,      flag
1,  abc,  pune,      same
2,  xyz,  bangalore, update
3,  kk,   mumbai,    new

can someone please help me to build the logic in pyspark?

Thanks in advance.

Upvotes: 1

Views: 8527

Answers (1)

DetroitMike
DetroitMike

Reputation: 124

Pyspark's hash function can help with identifying the records that are different.

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.hash.html

from pyspark.sql.functions import col, hash

df1 = df1.withColumn('hash_value', hash('id', 'name', 'city') 
df2 = df2.withColumn('hash_value', hash('id', 'name', 'city') 

df_updates = df1 .alias('a').join(df2.alias('b'), (\
            (col('a.id') == col('b.id')) &\
            (col('a.hash_value') != col('b.hash_value')) \
            ) , how ='inner'
        )

df_updates = df_updates.select(b.*) 

Once you have identified the records that are different.

Then you would be able to setup a function that can loop through each column in the df to compare that columns value.

Something like this should work



def add_change_flags(df1, df2):
   df_joined = df1.join(df2, 'id', how='inner')

   for column in df1.columns:
      df_joined = df_joined.withColumn(column + "_change_flag", \
            when(col(f"df1.{column}") === col(f"df2.{column}"),True)\
            .otherwise(False)) 

   return df_joined 

Upvotes: 2

Related Questions