Kaharon
Kaharon

Reputation: 395

How to use a non-time-based window with spark data streaming structure?

I am trying to use window on structured streaming with spark and kafka. I use window on non-time-based data, so I get this error:

'Non-time-based windows are not supported on streaming DataFrames/Datasets;;\nWindow

Here is my code:

window = Window.partitionBy("input_id").orderBy("similarity")
outputDf = inputDf\
        .crossJoin(ticketDf.withColumnRenamed("IDF", "old_IDF")) \
        .withColumn("similarity", cosine_similarity_udf(col("IDF"), col("old_IDF"))) \
        .withColumn("rank", rank().over(window)) \
        .filter(col("rank") < 10)

So I am looking for a tip or a reference to use window on non-time-based data...

Upvotes: 6

Views: 5406

Answers (3)

Mahnaz
Mahnaz

Reputation: 532

The traditional SQL windowing with over() is not supported in Spark Structured Streaming (The only windowing it supports is time-based windowing). If you Think about it, it is probably to avoid confusions. Some may falsely assume that Spark Structured Streaming can partition the whole data based on a column (it is impossible because streams are unbounded input data).

You instead can use groupBy(). groupBy() is also a state-full operation which is impossible to implement on append mode, unless we include a timestamp column in the list of columns that we want to do a groupBy operation on. For example:

df_result = df.withWatermark("createdAt", "10 minutes" ) \
              .groupBy( F.col('Id'), window(F.col("createdAt"), self.acceptable_time_difference)) \
              .agg(F.max(F.col('createdAt')).alias('maxCreatedAt'))

In this example createdAt is a timestamp typed column. Please note that in this case, we have to call withWatermrke on the timestamp column beforehand, because Spark cannot store the states boundlessly.

ps: I know groupBy does not function exactly like windowing, but with a simple join or custom function with mapGroupsWithState, you may be able to implement the desired functionality.

Upvotes: 4

Kaharon
Kaharon

Reputation: 395

Indeed the window is only based on time...

For the application I avoid avoid the use of flask. I have looked for a long time to a streaming system... and now I am using Kafka, and it rocks for my application ! :)

And I have this resource to share with you about the Unsupported Operations with the structured streaming : https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

Upvotes: 0

Windows always needs time-based data, but Spark Structured Streaming no.

You can create Spark Structured Streaming with the trigger "as_soon_as_posible" and you can group the data by window, the group is on time.

Reference: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time

Upvotes: 1

Related Questions