Reputation: 3887
I have the following query:
val ds = dataFrame
.filter(! $"requri".endsWith(".m3u8"))
.filter(! $"bserver".contains("trimmer"))
.withWatermark("time", "120 seconds")
.groupBy(window(dataFrame.col("time"),"60 seconds"),col("channelName"))
.agg(sum("bytes")/1000000 as "byte_count")
How do I implement a foreach writer so that its process method is triggered only once for every watermarking interval. i.e in the aforementioned example, I will get the following
10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)
Upvotes: 1
Views: 336
Reputation: 6085
To trigger the process method once for every watermarking interval you can use ProcessingTime("120 seconds")
. Something like this:
val query = ds.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("120 seconds"))
.start()
The trigger settings of a streaming query define the timing of streaming data processing, whether the query is going to execute as a micro-batch query with a fixed batch interval or as a continuous processing query.
Upvotes: 1