Reputation: 442
I have two Dataframes of different sizes: df1 (22 million rows), contains Latitude, Longitude, Date, Event df2 (100K rows), contains Latitude, Longitude, Date, temperature
For each row in df1, I'd like to find the closest matching location (defined by Euclidean Distance) for that date in table df2 (table df2 has all possible dates, while table df1 does not).
df1
+----------+-----------+----------+------------+
| Latitude | Longitude | Date | Event |
+----------+-----------+----------+------------+
| 10 | 10 | 11/10/20 | Water Polo |
| 20 | 20 | 11/22/19 | Cricket |
+----------+-----------+----------+------------+
df2:
+----------+-----------+----------+---------+
| Latitude | Longitude | Date | Weather |
+----------+-----------+----------+---------+
| 20 | 20 | 11/10/20 | 90 |
| 12 | 12 | 11/10/20 | 80 |
| 10 | 10 | 11/22/19 | 34 |
| 18 | 18 | 11/22/19 | 45 |
+----------+-----------+----------+---------+
Desired Output:
+----------+-----------+----------+---------+------------+
| Latitude | Longitude | Date | Weather | Event |
+----------+-----------+----------+---------+------------+
| 10 | 10 | 11/10/20 | 80 | Water Polo |
| 20 | 20 | 11/22/19 | 45 | Cricket |
+----------+-----------+----------+---------+------------+
I'm pretty new to PySpark and not sure how to write this query in an efficient manner.
Upvotes: 1
Views: 961
Reputation: 8410
First you could join on Date
, then compute your Distance
using pyspark inbuilt functions sqrt, pow
, and then take a window partitioned by Date
to calculate minimum distance per date(min_Distance)
, and then filter
on that.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("Date")
df1.join(df2.withColumnRenamed("Latitude","Latitude1")\
.withColumnRenamed("Longitude","Longitude1"),['Date'])\
.withColumn("Distance",F.sqrt(F.pow(F.col("Latitude")-F.col("Latitude1"),2)+\
F.pow(F.col("Longitude")-F.col("Longitude1"),2)))\
.withColumn("min_Distance", F.min("Distance").over(w))\
.filter('Distance=min_Distance')\
.select("Latitude","Longitude","Date","Weather","Event").show()
#+--------+---------+--------+-------+----------+
#|Latitude|Longitude| Date|Weather| Event|
#+--------+---------+--------+-------+----------+
#| 10| 10|11/10/20| 80|Water Polo|
#| 20| 20|11/22/19| 45| Cricket|
#+--------+---------+--------+-------+----------+
Upvotes: 2