elarib
elarib

Reputation: 674

Window timeseries with step in Spark/Scala

I have this input :

timestamp,user
1,A
2,B
5,C
9,E
12,F

The result wanted is :

timestampRange,userList
1 to 2,[A,B]
3 to 4,[] Or null
5 to 6,[C]
7 to 8,[] Or null
9 to 10,[E]
11 to 12,[F]

I tried using Window, but the problem, it doesn't include the empty timestamp range.

Any hints would be helpful.

Upvotes: 0

Views: 67

Answers (1)

dumitru
dumitru

Reputation: 2108

Don't know if widowing function will cover the gaps between ranges, but you can take the following approach :

Define a dataframe, df_ranges:

 val ranges = List((1,2), (3,4), (5,6), (7,8), (9,10))
 val df_ranges = sc.parallelize(ranges).toDF("start", "end")
+-----+---+
|start|end|
+-----+---+
|    1|  2|
|    3|  4|
|    5|  6|
|    7|  8|
|    9| 10|
+-----+---+

Data with the timestamp column, df_data :

val data = List((1,"A"), (2,"B"), (5,"C"), (9,"E"))
val df_data = sc.parallelize(data).toDF("timestamp", "user")
+---------+----+
|timestamp|user|
+---------+----+
|        1|   A|
|        2|   B|
|        5|   C|
|        9|   E|
+---------+----+

Join the two dataframe on the start, end, timestamp columns:

df_ranges.join(df_data, df_ranges.col("start").equalTo(df_data.col("timestamp")).or(df_ranges.col("end").equalTo(df_data.col("timestamp"))), "left")

+-----+---+---------+----+
|start|end|timestamp|user|
+-----+---+---------+----+
|    1|  2|        1|   A|
|    1|  2|        2|   B|
|    5|  6|        5|   C|
|    9| 10|        9|   E|
|    3|  4|     null|null|
|    7|  8|     null|null|
+-----+---+---------+----+

Now do a simple aggregation with collect_list function :

 res4.groupBy("start", "end").agg(collect_list("user")).orderBy("start")
+-----+---+------------------+
|start|end|collect_list(user)|
+-----+---+------------------+
|    1|  2|            [A, B]|
|    3|  4|                []|
|    5|  6|               [C]|
|    7|  8|                []|
|    9| 10|               [E]|
+-----+---+------------------+

Upvotes: 1

Related Questions