Reputation: 37
I'm trying to join two tables with each other in pyspark using python. The problem I face is that both tables do not have a primary key that I could use to match them on. But I know that they should match at the next bigger timestamp and an equal id. So assuimng the following: Table 1:
Timestamp1 | ID1 |
---|---|
15:00:00 | 26 |
15:15:00 | 27 |
15:30:00 | 26 |
Table 2:
Timestamp2 | ID2 |
---|---|
14:59:00 | 26 |
15:14:00 | 27 |
15:29:00 | 26 |
As a result I'd like to achieve something like this:
Timestamp1 | ID1 | Timestamp2 | ID2 |
---|---|---|---|
15:00:00 | 26 | 14:59:00 | 26 |
15:15:00 | 27 | 15:14:00 | 27 |
15:30:00 | 26 | 15:29:00 | 26 |
My approach was something like this:
final_df = sqlContext.sql("SELECT * FROM df_1 INNER JOIN df_2 ON df_1.Timestamp1 > df_2.Timestamp2 and df_1.ID1 == df_2.ID2")
But here of course there will be a match for every ID that is bigger than Timestamp2.. Is there an elegant way to only match with the closest bigger Timestamp?
Upvotes: 1
Views: 448
Reputation: 645
After the join you defined, you could use a Window-function to assign a rank to the duplicate lines and select the one with the lowest timestamp_1
:
from pyspark.sql import Window, function as F
final_df = (
sqlContext.sql("SELECT * FROM df_1 INNER JOIN df_2 ON df_1.Timestamp1 > df_2.Timestamp2 and df_1.ID1 == df_2.ID2")
.withColumn('rank', F.row_number().over(Window.partitionBy('ID1').orderby('Timestamp1'))
.where('rank == 1')
)
Upvotes: 1