Reputation: 571
We have a spark Streaming application. Architecture is as follows
Kinesis to Spark to Kafka.
The Spark application is using qubole/kinesis-sql for structured streaming from Kinesis. The data is then aggregated and then pushed to Kafka.
Our use case demands a delay of 4 minutes before pushing to Kafka.
The windowing is done with 2 minutes and watermark of 4 minutes
val windowedCountsDF = messageDS
.withWatermark("timestamp", "4 minutes")
.groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")
Write to Kafka is triggered every two minutes
val eventFilteredQuery = windowedCountsDF
.selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
.writeStream
.trigger(Trigger.ProcessingTime("2 minutes"))
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("checkpointLocation", checkPoint)
.outputMode("update")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.queryName("events_kafka_stream")
.start()
I can change the trigger time to match the window , but still some events gets pushed to kafka instantly.
Is there any way to delay writes to Kafka x minutes after the window is completed.
Thanks
Upvotes: 3
Views: 1594
Reputation: 28367
Change your output mode from update
to append
(the default option). The output
mode will write all updated rows to the sink, hence, if you use a watermark or not will not matter.
However, with the append
mode any writes will need wait until the watermark is crossed - which is exactly what you want:
Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in
withWatermark()
as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed).
Upvotes: 2