andym
andym

Reputation: 3

Spark Structured Streaming - constructing window interval

I have joined a streaming dataframe (consisting of the first four columns below) with a static dataframe (providing the last two columns) to produce a new streaming dataframe (called first_agg_sdf) with the following structure:

+---------+--------+-----+-------------------+-----+------------+
|elementid|metricid|value|              epoch|sigma|windowlength|
+---------+--------+-----+-------------------+-----+------------+
|        2|       6|41.01|2018-02-28 16:56:10|  3.3|   5 minutes|
|        2|       6|61.45|2018-02-28 16:56:24|  3.3|   5 minutes|
|        2|       6| 9.13|2018-02-28 16:56:51|  3.3|   5 minutes|
|        2|       6|34.21|2018-02-28 16:57:19|  3.3|   5 minutes|
|        2|       5|43.25|2018-02-28 16:56:10|  3.2|   3 minutes|
|        2|       5| 4.96|2018-02-28 16:56:24|  3.2|   3 minutes|
|        2|       5|22.81|2018-02-28 16:56:51|  3.2|   3 minutes|
|        2|       5| 0.04|2018-02-28 16:57:19|  3.2|   3 minutes|

This has schema:

root
 |-- elementid: integer (nullable = true)
 |-- metricid: integer (nullable = true)
 |-- value: float (nullable = true)
 |-- epoch: timestamp (nullable = true)
 |-- sigma: double (nullable = true)
 |-- windowlength: string (nullable = true)

I then want to produce a sliding window that aggregates over elementid, metricid for a window duration given by the value in the windowlength column for that row.

I produced the following code:

first_agg_window = first_agg_sdf \
    .withWatermark("epoch", "30 seconds") \
    .groupBy(
         window(timeColumn="epoch", windowDuration="windowlength", slideDuration="30 seconds"),
        "elementid",
        "metricid")
    .agg(stddev_pop("value").alias("movingstd"), avg("value").alias("movingavg"), last("value").alias("value"))

The above window aggregation works okay if I provide a string to the windowDuration attribute like so:

windowDuration="5 minutes".

However, if I use the dataframe column value like so:

windowDuration="windowlength"

I get the following error :

Traceback (most recent call last): File "/home/ec2-user/spark/spark-2.2.0-bin- hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/home/ec2-user/spark/spark-2.2.0-bin-hadoop2.7/python/lib/py4j- 0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.window. : java.lang.IllegalArgumentException: The provided interval (windowlength) did not correspond to a valid interval string.at org.apache.spark.sql.catalyst.expressions.TimeWindow.getIntervalInMicroSeconds(TimeWindow.scala:120) at org.apache.spark.sql.catalyst.expressions.TimeWindow$.apply(TimeWindow.scala:148) at org.apache.spark.sql.functions$.window(functions.scala:2805) at org.apache.spark.sql.functions$.window(functions.scala:2852) at org.apache.spark.sql.functions.window(functions.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Unknown Source)

How do I pass in the column value windowlength in each row into attribute windowDuration without generating the above error?

Upvotes: 0

Views: 1393

Answers (1)

tstites
tstites

Reputation: 284

Unfortunately it doesn't work that way. The window duration is a fixed value for the life of the streaming query. It can only be set on streaming query start.

You will most likely have to use two streaming queries, one for each window duration.

Try writing two streaming queries: 1. Read the source 2. Filter rows by window duration 3. aggregate 4. Optional-join back the data later

Upvotes: 3

Related Questions