kashmoney
kashmoney

Reputation: 442

PySpark how to join 2 DataFrames efficiently with non-matching keys

I have two Dataframes of different sizes: df1 (22 million rows), contains Latitude, Longitude, Date, Event df2 (100K rows), contains Latitude, Longitude, Date, temperature

For each row in df1, I'd like to find the closest matching location (defined by Euclidean Distance) for that date in table df2 (table df2 has all possible dates, while table df1 does not).

df1

+----------+-----------+----------+------------+
| Latitude | Longitude |   Date   |   Event    |
+----------+-----------+----------+------------+
|       10 |        10 | 11/10/20 | Water Polo |
|       20 |        20 | 11/22/19 | Cricket    |
+----------+-----------+----------+------------+

df2:

+----------+-----------+----------+---------+
| Latitude | Longitude |   Date   | Weather |
+----------+-----------+----------+---------+
|       20 |        20 | 11/10/20 |      90 |
|       12 |        12 | 11/10/20 |      80 |
|       10 |        10 | 11/22/19 |      34 |
|       18 |        18 | 11/22/19 |      45 |
+----------+-----------+----------+---------+

Desired Output:

+----------+-----------+----------+---------+------------+
| Latitude | Longitude |   Date   | Weather |   Event    |
+----------+-----------+----------+---------+------------+
|       10 |        10 | 11/10/20 |      80 | Water Polo |
|       20 |        20 | 11/22/19 |      45 | Cricket    |
+----------+-----------+----------+---------+------------+

I'm pretty new to PySpark and not sure how to write this query in an efficient manner.

Upvotes: 1

Views: 961

Answers (1)

murtihash
murtihash

Reputation: 8410

First you could join on Date, then compute your Distance using pyspark inbuilt functions sqrt, pow , and then take a window partitioned by Date to calculate minimum distance per date(min_Distance), and then filter on that.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("Date")

df1.join(df2.withColumnRenamed("Latitude","Latitude1")\
            .withColumnRenamed("Longitude","Longitude1"),['Date'])\
   .withColumn("Distance",F.sqrt(F.pow(F.col("Latitude")-F.col("Latitude1"),2)+\
                                 F.pow(F.col("Longitude")-F.col("Longitude1"),2)))\
   .withColumn("min_Distance", F.min("Distance").over(w))\
   .filter('Distance=min_Distance')\
   .select("Latitude","Longitude","Date","Weather","Event").show()

#+--------+---------+--------+-------+----------+
#|Latitude|Longitude|    Date|Weather|     Event|
#+--------+---------+--------+-------+----------+
#|      10|       10|11/10/20|     80|Water Polo|
#|      20|       20|11/22/19|     45|   Cricket|
#+--------+---------+--------+-------+----------+

Upvotes: 2

Related Questions