UndefinedKid01
UndefinedKid01

Reputation: 47

How to compare differences between dataframes in pyspark

I have two dataframes that are essentially the same the same, but coming from two different sources. In my first dataframe I have p_user_id and date_of_birth fields that are a longType and one that is dateType and the rest of the fields are stringType. In my second dataframe everything is of stringType. I first check the row count for both dataframes based on the p_user_id(That is my unique identifier). DF1:

+--------------+                                                                
|test1_racounts|
+--------------+
|        418895|
+--------------+

DF2:

+---------+                                                                     
|d_tst_rac|
+---------+
|   418915|
+---------+

Then if there is a difference in the row count I run a check on which p_user_id values are in one dataframe and not the other.

p_user_tst_rac.subtract(rac_p_user_df).show(100, truncate=0)

Gives me this result:

+---------+                                                                     
|p_user_id|
+---------+
|661520   |
|661513   |
|661505   |
|661461   |
|661501   |
|661476   |
|661478   |
|661468   |
|661479   |
|661464   |
|661467   |
|661474   |
|661484   |
|661495   |
|661499   |
|661486   |
|661502   |
|661506   |
|661517   |
+---------+

My issue comes into play when I'm trying to pull the rest of the corresponding fields for the difference. I want to pull the rest of the fields so that I can do a manual search in the DB and application to see if there is something that is overlooked. When I add the rest of columns my results get higher than 20 row counts for a difference. What is a better way to run the match and get the corresponding data:

Full code scope:

#racs in mysql
my_rac = spark.read.parquet("/Users/mysql.parquet")
my_rac.printSchema()
my_rac.createOrReplaceTempView('my_rac')
d_rac = spark.sql('''select distinct * from my_rac''')
d_rac.createOrReplaceTempView('d_rac')
spark.sql('''select count(*) as test1_racounts_ from d_rac''').show()
rac_p_user_df = spark.sql('''select 
cast(p_user_id as string) as p_user_id
, record_id
, contact_last_name
, contact_first_name
 from d_rac''')

#mssql_rac
sql_rac = spark.read.csv("/Users/mzn293/Downloads/kavi-20211116.csv")
#sql_rac.printSchema()
hav_rac.createOrReplaceTempView('sql_rac')
d_sql_rac = spark.sql('''select distinct 
_c0 as p_user_id
, _c1 as record_id
, _c4 as contact_last_name
, _c5 as contact_first_name
 from sql_rac''')
d_sql_rac.createOrReplaceTempView('d_sql_rac')
spark.sql('''select count(*) as d_aws_rac from d_sql_rac''').show()
dist_aws_rac = spark.sql('''select * from d_aws_rac''')
dist_sql_rac.subtract(rac_p_user_df).show(100, truncate=0)

With this I get more than a 20 count difference. Furthermore, I feel there is a better way to get my result. But I'm not sure what I'm missing to get the data for those 20 rows and not get 100 plus rows.

Upvotes: 1

Views: 2853

Answers (1)

NNM
NNM

Reputation: 398

The easiest way will be to use the anti join in this case.

df_diff = df1.join(df2, df1.p_user_id == df2.p_user_id, "leftanti")

this will give you the row of all records existing in df1, but have no matching record in df2.

Upvotes: 1

Related Questions