Jeenson Ephraim
Jeenson Ephraim

Reputation: 571

Spark Streaming write to Kafka with delay - after x minutes

We have a spark Streaming application. Architecture is as follows

Kinesis to Spark to Kafka.

The Spark application is using qubole/kinesis-sql for structured streaming from Kinesis. The data is then aggregated and then pushed to Kafka.

Our use case demands a delay of 4 minutes before pushing to Kafka.

The windowing is done with 2 minutes and watermark of 4 minutes

val windowedCountsDF = messageDS
   .withWatermark("timestamp", "4 minutes")
   .groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")

Write to Kafka is triggered every two minutes

val eventFilteredQuery = windowedCountsDF
  .selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
  .writeStream
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("checkpointLocation", checkPoint)
  .outputMode("update")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .queryName("events_kafka_stream")
  .start()

I can change the trigger time to match the window , but still some events gets pushed to kafka instantly.

Is there any way to delay writes to Kafka x minutes after the window is completed.

Thanks

Upvotes: 3

Views: 1594

Answers (1)

Shaido
Shaido

Reputation: 28367

Change your output mode from update to append (the default option). The output mode will write all updated rows to the sink, hence, if you use a watermark or not will not matter.

However, with the append mode any writes will need wait until the watermark is crossed - which is exactly what you want:

Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in withWatermark() as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed).

Upvotes: 2

Related Questions