Reputation: 163
I am facing an issue in Structured Streaming with Spark.
Current setup : I have a DataStream coming from Kafka. Each message has an eventtime. I am using these eventtimes to make window aggregates and a watermark rule to discard state.The mode of output is append mode.
Aim: I need to get the window aggregates in order as they expire so that I can process these events in order of eventime windows. I expect the windows state to expire sequencially because of my sliding window.
Problem: Some times the order of messages printed is not sequential on basis of windows. For example:
Why are the windows not dropped in order? I wanted this to be ordered. Please help.
Upvotes: 2
Views: 1640
Reputation: 18033
It's not in the architecture (yet). Many posts here on SO allay that notion.
As cricket_007 states in numerous posts, you can best leave the sorting to the down-stream system(s) in general. It is also more flexible that way, the whole notion of the RDBMS is that fixing the data sort order is less valid - clustering aside.
If you look at this use case https://mapr.com/blog/real-time-analysis-popular-uber-locations-spark-structured-streaming-machine-learning-kafka-and-mapr-db/ then you see that sorting plays no role. That said, I see many requests, but many can achieve the goal without sorting.
Single Partition Topics are an outcome for lower volumes, providing the sort order is set by the 'producer' and that order is acceptable. Also, you can consider KSQL and writing to a single partition KAFKA Topic to read in from subsequently or KAFKA Streams with Java, Scala.
I think the issue is that as from a post I saw a few years ago:
The basic tenet of structured streaming is that a query should return the same answer in streaming or batch mode. We support sorting in complete mode because we have all the data and can sort it correctly and return the full answer. In update or append mode, sorting would only return a correct answer if we could promise that records that sort lower are going to arrive later (and we can't). Therefore, it is disallowed.
Upvotes: 1