Afrobeta
Afrobeta

Reputation: 37

Pyspark Join table by next bigger timestamp

I'm trying to join two tables with each other in pyspark using python. The problem I face is that both tables do not have a primary key that I could use to match them on. But I know that they should match at the next bigger timestamp and an equal id. So assuimng the following: Table 1:

Timestamp1 ID1
15:00:00 26
15:15:00 27
15:30:00 26

Table 2:

Timestamp2 ID2
14:59:00 26
15:14:00 27
15:29:00 26

As a result I'd like to achieve something like this:

Timestamp1 ID1 Timestamp2 ID2
15:00:00 26 14:59:00 26
15:15:00 27 15:14:00 27
15:30:00 26 15:29:00 26

My approach was something like this: final_df = sqlContext.sql("SELECT * FROM df_1 INNER JOIN df_2 ON df_1.Timestamp1 > df_2.Timestamp2 and df_1.ID1 == df_2.ID2")

But here of course there will be a match for every ID that is bigger than Timestamp2.. Is there an elegant way to only match with the closest bigger Timestamp?

Upvotes: 1

Views: 448

Answers (1)

Til Piffl
Til Piffl

Reputation: 645

After the join you defined, you could use a Window-function to assign a rank to the duplicate lines and select the one with the lowest timestamp_1:

from pyspark.sql import Window, function as F

final_df = (
    sqlContext.sql("SELECT * FROM df_1 INNER JOIN df_2 ON df_1.Timestamp1 > df_2.Timestamp2 and df_1.ID1 == df_2.ID2")
    .withColumn('rank', F.row_number().over(Window.partitionBy('ID1').orderby('Timestamp1'))
    .where('rank == 1')
)

Upvotes: 1

Related Questions