shaikh
shaikh

Reputation: 592

Spark Strucutured Streaming Window on non-timestamp column

I am getting a data stream of the form:

+--+---------+---+----+
|id|timestamp|val|xxx |
+--+---------+---+----+
|1 |12:15:25 | 50| 1  |
|2 |12:15:25 | 30| 1  |
|3 |12:15:26 | 30| 2  |
|4 |12:15:27 | 50| 2  |
|5 |12:15:27 | 30| 3  |
|6 |12:15:27 | 60| 4  |
|7 |12:15:28 | 50| 5  |
|8 |12:15:30 | 60| 5  |
|9 |12:15:31 | 30| 6  |
|. |...      |...|... |

I am interested in applying window operation to the xxx column just like the window operation over timestamp is available in Spark Streaming with some window size and sliding step.

Let in the groupBy with window function below, lines represent a streaming dataframe with window size: 2 and sliding step: 1.

val c_windowed_count = lines.groupBy(
  window($"xxx", "2", "1"), $"val").count().orderBy("xxx")

So, the output should be as follows:

+------+---+-----+
|window|val|count|
+------+---+-----+
|[1, 3]|50 |  2  |
|[1, 3]|30 |  2  |
|[2, 4]|30 |  2  |
|[2, 4]|50 |  1  |
|[3, 5]|30 |  1  |
|[3, 5]|60 |  1  |
|[4, 6]|60 |  2  |
|[4, 6]|50 |  1  |
|...   |.. | ..  |

I tried using partitionBy but it is not supported in Spark Structured Streaming.

I am using Spark Structured Streaming 2.3.1.

Thanks!

Upvotes: 7

Views: 1586

Answers (2)

Michael West
Michael West

Reputation: 1706

New in spark 2.2 is arbitrary-stateful-operations

A use case is managing user sessions, a 'user window'

scroll half way down this page to see an example

If Shaido's clever solution is working for you then I suggest stay with that. For more complex requirements arbitrary-stateful-operations looks like the way to go.

Upvotes: 0

Shaido
Shaido

Reputation: 28322

It's currently not possible to use windows on non-timestamp columns in this way using Spark Structured Streaming. However, what you can do is convert the xxx column to a timestamp column, do the groupBy and count, and then transform back.

from_unixtime can be used to convert the number of seconds since 1970-01-01 to a timestamp. Use the xxx column as seconds and it's possible to create a fake timestamp to use in a window:

lines.groupBy(window(from_unixtime($"xxx"), "2 seconds", "1 seconds"), $"val").count()
  .withColumn("window", struct(unix_timestamp($"window.start"), unix_timestamp($"window.end")).as("window"))
  .filter($"window.col1" =!= 0)
  .orderBy($"window.col1")

Above, the grouping is done on the converted timestamp and the next row will convert it back to it's original number. The filter is done since the first two rows will be one a window [0,2] (i.e. only on the rows with xxx equals 1) but can be skipped.

Resulting output of the above input:

+------+---+-----+
|window|val|count|
+------+---+-----+
| [1,3]| 50|    2|
| [1,3]| 30|    2|
| [2,4]| 30|    2|
| [2,4]| 50|    1|
| [3,5]| 30|    1|
| [3,5]| 60|    1|
| [4,6]| 60|    2|
| [4,6]| 50|    1|
| [5,7]| 30|    1|
| [5,7]| 60|    1|
| [5,7]| 50|    1|
| [6,8]| 30|    1|
+------+---+-----+

Upvotes: 4

Related Questions