cucucool
cucucool

Reputation: 3887

Writing record once after the watermarking interval in Spark Structured Streaming

I have the following query:

val ds = dataFrame
  .filter(! $"requri".endsWith(".m3u8"))
  .filter(! $"bserver".contains("trimmer"))
  .withWatermark("time", "120 seconds")
  .groupBy(window(dataFrame.col("time"),"60 seconds"),col("channelName"))
  .agg(sum("bytes")/1000000 as "byte_count")

How do I implement a foreach writer so that its process method is triggered only once for every watermarking interval. i.e in the aforementioned example, I will get the following

10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)

Upvotes: 1

Views: 336

Answers (1)

himanshuIIITian
himanshuIIITian

Reputation: 6085

To trigger the process method once for every watermarking interval you can use ProcessingTime("120 seconds"). Something like this:

val query = ds.writeStream
              .format("console")
              .trigger(Trigger.ProcessingTime("120 seconds"))
              .start()

The trigger settings of a streaming query define the timing of streaming data processing, whether the query is going to execute as a micro-batch query with a fixed batch interval or as a continuous processing query.

Upvotes: 1

Related Questions