hokoio
hokoio

Reputation: 39

Compare two pyspark dataframes by joining

I have two pyspark dataframes that have different number of rows. I am trying to compare values in all the columns by joining these two dataframes on multiple keys so I can find the records that have different values and the records that have the same values in these columns.

#df1:
+-------+----------+----------+----------+
|id     |age       |sex       |value
+-------+----------+----------+
| 1     |  23      |    M    | 8.4
| 2     |  4       |    M    |2
| 3     |16        |    F    | 4.1
| 4     | 60       |    M    |4
|  5    |  null    |    F    |5
+-------+----------+----------+|

#df2:
+-------+----------+----------+----------+
|id     |age       |sex       |value
+-------+----------+----------+
| 1     |  23      |    M    | 8.4
| 2     |  4       |    null    |2
| 4     | 13       |    M    |3.1
| 5     |  34       |    F    |6.2
+-------+----------+----------+|



#joining df1 and df2 on multiple keys

same=df1.join(df2, on=['id','age','sex','value'], how='inner')

Please note that the dataframes above are just samples. My real data has around 25 columns and 100k+ rows. So when I tried to do the join, the spark job was taking a long time and doesn't finish.

Want to know if anyone has good advice on comparing two dataframes and find out records that have different values in columns either using joining or other methods?

Upvotes: 0

Views: 1870

Answers (1)

Andy_101
Andy_101

Reputation: 1306

Use hashing.

from pyspark.sql.functions import hash
df1 = spark.createDataFrame([('312312','151132'),('004312','12232'),('','151132'),('013vjhrr134','111232'),(None,'151132'),('0fsgdhfjgk','151132')],
                           ("Fruits", "Meat"))
df1 = df1.withColumn('hash_value', hash("Fruits", "Meat"))

df = spark.createDataFrame([('312312','151132'),('000312','151132'),('','151132'),('013vjh134134','151132'),(None,'151132'),('0fsgdhfjgk','151132')],
                           ("Fruits", "Meat"))
df = df.withColumn('hash_value', hash("Fruits", "Meat"))

df.show()
+------------+------+-----------+
|      Fruits|  Meat| hash_value|
+------------+------+-----------+
|      312312|151132| -344340697|
|      000312|151132| -548650515|
|            |151132|-2105905448|
|013vjh134134|151132| 2052362224|
|        null|151132|  598159392|
|  0fsgdhfjgk|151132|  951458223|
+------------+------+-----------+
df1.show()
+-----------+------+-----------+
|     Fruits|  Meat| hash_value|
+-----------+------+-----------+
|     312312|151132| -344340697|
|     004312| 12232|   76821046|
|           |151132|-2105905448|
|013vjhrr134|111232| 1289730088|
|       null|151132|  598159392|
| 0fsgdhfjgk|151132|  951458223|
+-----------+------+-----------+

OR you can use SHA2 for the same

from pyspark.sql.functions import sha2, concat_ws
df1.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256)).show(truncate=False)

+-----------+------+----------------------------------------------------------------+
|Fruits     |Meat  |row_sha2                                                        |
+-----------+------+----------------------------------------------------------------+
|312312     |151132|7be3824bcaa5fa29ad58df2587d392a1cc9ca5511ef01005be6f97c9558d1eed|
|004312     |12232 |c7fcf8031a17e5f3168297579f6dc8a6f17d7a4a71939d6b989ca783f30e21ac|
|           |151132|68ea989b7d33da275a16ff897b0ab5a88bc0f4545ec22d90cee63244c1f00fb0|
|013vjhrr134|111232|9c9df63553d841463a803c64e3f4a8aed53bcdf78bf4a089a88af9e91406a226|
|null       |151132|83de2d466a881cb4bb16b83665b687c01752044296079b2cae5bab8af93db14f|
|0fsgdhfjgk |151132|394631bbd1ccee841d3ba200806f8d0a51c66119b13575cf547f8cc91066c90d|
+-----------+------+----------------------------------------------------------------+

This will create a unique code for all your rows now join and compare the hash values on the two data frames. If the values in the rows are same they will have same hash values as well.

Now you can compare using joins

df1.join(df, "hash_value", "inner").show()
+-----------+----------+------+----------+------+
| hash_value|    Fruits|  Meat|    Fruits|  Meat|
+-----------+----------+------+----------+------+
|-2105905448|          |151132|          |151132|
| -344340697|    312312|151132|    312312|151132|
|  598159392|      null|151132|      null|151132|
|  951458223|0fsgdhfjgk|151132|0fsgdhfjgk|151132|
+-----------+----------+------+----------+------+

df1.join(df, "hash_value", "outer").show()
+-----------+-----------+------+------------+------+
| hash_value|     Fruits|  Meat|      Fruits|  Meat|
+-----------+-----------+------+------------+------+
|-2105905448|           |151132|            |151132|
| -548650515|       null|  null|      000312|151132|
| -344340697|     312312|151132|      312312|151132|
|   76821046|     004312| 12232|        null|  null|
|  598159392|       null|151132|        null|151132|
|  951458223| 0fsgdhfjgk|151132|  0fsgdhfjgk|151132|
| 1289730088|013vjhrr134|111232|        null|  null|
| 2052362224|       null|  null|013vjh134134|151132|
+-----------+-----------+------+------------+------+

Upvotes: 1

Related Questions