WebDev930
WebDev930

Reputation: 25

Can intermediate state be dropped/controlled in Spark structured streaming in Complete Output mode? (Spark 2.4.0)

I have a scenario where I want to process data from a kafka topic. I have this particular java code to read the data as a stream from kafka topic.

Dataset<Row> streamObjs = sparkSession.readStream().format("kafka")
                .option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", streamTopic)
                .option("failOnDataLoss", false).load();

I cast it to String, define the schema, then try to use watermark (for late data) and window (for grouping and aggregations) and finally output to kafka sink.

Dataset<Row> selectExprImporter = streamObjs.selectExpr("CAST(value AS STRING)");

StructType streamSchema = new StructType().add("id", DataTypes.StringType)
                .add("timestamp", DataTypes.LongType)
                .add("values", new MapType(DataTypes.StringType, DataTypes.DoubleType, false));

Dataset<Row> selectValueImporter = selectExprImporter
                .select(functions.from_json(new Column("value"), streamSchema ).alias("data"));
.
.
(More transformations/operations)
.
.

Dataset<Row> aggCount_15min = streamData.withWatermark("timestamp", "2 minute")
                .withColumn("frequency", functions.lit(15))
                .groupBy(new Column("id"), new Column("frequency"),
                        functions.window(new Column("timestamp"), "15 minute").as("time_range"))
                .agg(functions.mean("value").as("mean_value"), functions.sum("value").as("sum"),
                        functions.count(functions.lit(1)).as("number_of_values"))
                .filter("mean_value > 35").orderBy("id", "frequency", "time_range");

aggCount_15min.selectExpr("to_json(struct(*)) AS value").writeStream()
                .outputMode(OutputMode.Complete()).format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
                .option("topic", outputTopic).option("checkpointLocation", checkpointLocation).start().awaitTermination();

Questions

  1. Am I correct in understanding that when using Complete Output mode in the kafka sink, the intermediate state will keep on increasing forever until I get OutOfMemory exception?

  2. Also, what is the ideal use case for Complete Output mode? Use it only when intermediate data/state doesn't increase?

  3. Complete Output mode is needed in my case as I want to use the orderBy clause. Is there some way so that I can force spark to drop the state it has after every say 30 mins and work again with new data?

  4. Is there a better way to not use Complete Output mode but still get the desired result? Should I use something else other than spark structured streaming?

The desired result being aggregating and grouping data as per the query above, then when 1st batch has been created, drop all state and start fresh for next batch. Here batch can be a function of last processed timestamp. Like say drop all state and start fresh when current timestamp has crossed 20 min from the first received timestamp or better if its a function of window time (15min in this example) like say when 4 batches of 15 min windows have been processed and timestamp for 5th batch arrives drop state for previous 4 batches and start fresh for this batch.

Upvotes: 0

Views: 619

Answers (1)

Ged
Ged

Reputation: 18003

The question asks many things and focuses less on what Spark Structured Streaming (SSS) actually does. Answering your numbered questions, title question and non-numbered question then:

A. Title Question:

Not as such, but Complete mode only stores aggregates, so not all data is stored but a state allowing re-computation based on incremental adding of data. I find the manual misleading in terms of its description, but it may be jus me. But you will get this error otherwise:

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets

  1. Am I correct in understanding that when using Complete Output mode in the kafka sink, the intermediate state will keep on increasing forever until I get OutOfMemory exception?

The kafka sink does not figure here. The intermediate state is what Spark Structured Streaming needs to store. It stores aggregates and discards the newer data. But in the end you would get an OOM due to this or some other error I suspect.

  1. Also, what is the ideal use case for Complete Output mode? Use it only when intermediate data/state doesn't increase?

For aggregations over all data received. 2nd part of your question is not logical and I cannot answer therefore. The state will generally increase over time.

  1. Complete Output mode is needed in my case as I want to use the orderBy clause. Is there some way so that I can force spark to drop the state it has after every say 30 mins and work again with new data?

No, there is not. Even trying to stop gracefully is not an idea and then re-starting as the period is not really 15 mins then. And, it's against the SSS approach anyway. From the manuals: Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode. You cannot drop the state as you would like, again aggregates discussion.

  1. Is there a better way to not use Complete Output mode but still get the desired result? Should I use something else other than spark structured streaming?

No, as you have many requirements that cannot be satisfied by the current implementation. Unless you drop order by and do non-overlapping window operation (15,15) in Append mode with a minuscule watermark, if memory serves correctly. You would then rely on sorting later on by down-stream processing as order by not supported.

Final overall question: The desired result being aggregating and grouping data as per the query above, then when 1st batch has been created, drop all state and start fresh for next batch. Here batch can be a function of last processed timestamp. Like say drop all state and start fresh when current timestamp has crossed 20 min from the first received timestamp or better if its a function of window time (15min in this example) like say when 4 batches of 15 min windows have been processed and timestamp for 5th batch arrives drop state for previous 4 batches and start fresh for this batch.

Whilst your ideas may be considered understandable, the SSS-framework does not support it all and specifically what you want(, just yet).

Upvotes: 1

Related Questions