ultraInstinct
ultraInstinct

Reputation: 4323

Pyspark - Join timestamp window against timestamp values

Is there any effective way of joining a list of timestamp windows against a list of timestamp values?

The dataframe A has these values:

+------------------------------------+---------------------------------------------+----------------------+
|userid       |                    window                   |total_unique_locations|
+------------------------------------+---------------------------------------------+----------------------+
|da24a375-962a|[2017-06-04 03:20:00.0,2017-06-04 03:25:00.0]|2                     |
|0fd2b419-d6ec|[2017-06-04 03:50:00.0,2017-06-04 03:55:00.0]|2                     |
|c8159400-fe0a|[2017-06-04 03:10:00.0,2017-06-04 03:15:00.0]|2                     |
|a4336494-3a10|[2017-06-04 03:00:00.0,2017-06-04 03:05:00.0]|3                     |
|b4590016-1af2|2017-06-04 03:45:00.0,2017-06-04 03:50:00.0] |2                     |
|03b33b0a-e94e|[2017-06-04 03:30:00.0,2017-06-04 03:35:00.0]|2                     |
|e5e4c972-6599|[2017-06-04 03:25:00.0,2017-06-04 03:30:00.0]|5                     |
|345e81fb-5e12|[2017-06-04 03:50:00.0,2017-06-04 03:55:00.0]|2                     |
|bedd88f1-3751|[2017-06-04 03:20:00.0,2017-06-04 03:25:00.0]|2                     |
|da401dab-e7f3|[2017-06-04 03:20:00.0,2017-06-04 03:25:00.0]|2                     |
+------------------------------------+---------------------------------------------+----------------------+

where the data type of window is struct<start:timestamp,end:timestamp>

And the dataframe B has these values:

+------------------------------------+------------------+
|userid       |eventtime            |distance           |
+------------------------------------+------------------+
|9f034a1d-02c1|2017-06-04 03:00:00.0|0.17218625176420413|
|9f034a1d-02c1|2017-06-04 03:00:00.0|0.11145767867097957|
|9f034a1d-02c1|2017-06-04 03:00:00.0|0.14064932728588236|
|a3fac437-efcc|2017-06-04 03:00:00.0|0.08328915597349452|
|a3fac437-efcc|2017-06-04 03:00:00.0|0.07079054693441306|
+------------------------------------+------------------+

I tried to use the regular join but it does not work as the window and eventtime have different data types.

A.join(B, A.userid == B.userid, A.window == B.eventtime).select("*")

Any suggestions?

Upvotes: 4

Views: 1938

Answers (2)

koiralo
koiralo

Reputation: 23109

You cannot join these two since the data type of window and eventtime are different.

val result = A.join(B,
  A("userid") === B("userid") && 
  A("window.start") === B("eventtime") ||
  A("window.end") === B("eventtime"), "left")

Hope this helps!

Upvotes: 0

Alper t. Turker
Alper t. Turker

Reputation: 35229

The less efficient solution is to join or crossJoin with beteween:

a.join(b, col("eventtime").between(col("window.start"), col("window.end")))

The more efficient solution is to convert eventtime to a struct with the same definition as used for existing window. For example:

(b
    .withColumn("event_window", window(col("eventtime"), "5 minutes"))
    .join(a, col("event_window") == col("window")))

Upvotes: 5

Related Questions