himanshuIIITian
himanshuIIITian

Reputation: 6105

Empty output for Watermarked Aggregation Query in Append Mode

I use Spark 2.2.0-rc1.

I've got a Kafka topic which I'm querying a running watermarked aggregation, with a 1 minute watermark, giving out to console with append output mode.

import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingOffsets", "earliest").
  option("subscribe", "topic").
  load.
  select(from_json(col("value").cast("string"), schema).as("value"))
  select("value.*").
  withWatermark("time", "1 minute").
  groupBy("time").
  count.
  writeStream.
  outputMode("append").
  format("console").
  start

I am pushing following data in Kafka topic:

{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}

And I am getting following output:

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

Is this expected behaviour?

Upvotes: 15

Views: 3846

Answers (2)

Ray J
Ray J

Reputation: 845

Here's my best guess:

Append mode only outputs the data after the watermark has passed (e.g. in this case 1 minute later). You didn't set a trigger (e.g. .trigger(Trigger.ProcessingTime("10 seconds")) so by default it outputs batches as fast as possible. So for the first minute all your batches should be empty, and the first batch after a minute should contain some content.

Another possibility is that you're using groupBy("time") instead of groupBy(window("time", "[window duration]")). I believe watermarks are meant to be used with time windows or mapGroupsWithState, so I'm not how the interaction works in this case.

Upvotes: 5

zsxwing
zsxwing

Reputation: 20836

Pushing more data to Kafka should trigger Spark to output something. The current behavior is totally because of the internal implementation.

When you push some data, StreamingQuery will generate a batch to run. When this batch finishes, it will remember the max event time in this batch. Then in the next batch, because you are using append mode, StreamingQuery will use the max event time and watermark to evict old values from StateStore and output it. Therefore you need to make sure generating at least two batches in order to see output.

Upvotes: 8

Related Questions