Pavel Orekhov
Pavel Orekhov

Reputation: 2177

When exactly does watermarking drop late data?

Here's the simple code that I use to test watermarking:

spark.readStream
  .textFile("C:\\Users\\Pavel_Orekhov\\Desktop\\stream")
  .map(_.split(","))
  .map(a => (a(0), a(1), a(2)))
  .toDF("hour", "hashTag", "userId")
  .selectExpr("CAST(hour as TimeStamp)","hashTag", "userId")
  .withWatermark("hour", "1 hour")
  .groupBy(
    window($"hour", "1 hour", "1 hour"),
    $"hashTag",
    $"userId"
  ).count()
  .writeStream
  .outputMode(OutputMode.Update())
  .format("console")
  .start().processAllAvailable()

The folder stream contains one file with these contents:

1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T10:03,pavel,123
1994-12-28T10:03,pavel,123
1994-12-28T11:03,pavel,123
1994-12-28T11:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T06:03,pavel,123

The output that I get is this:

+--------------------+-------+------+-----+
|              window|hashTag|userId|count|
+--------------------+-------+------+-----+
|[1994-12-28 09:00...|  pavel|   123|    7|
|[1994-12-28 06:00...|  pavel|   123|    1|
|[1994-12-28 11:00...|  pavel|   123|    2|
|[1994-12-28 10:00...|  pavel|   123|    2|
+--------------------+-------+------+-----+

In the text file that I read from you can see that there is 9AM entry and a 6AM entry that comes after an 11AM entry. I thought these would be dropped, because watermarking should only update the data that we received within the last hour.

So, why does it not get dropped?

Upvotes: 1

Views: 106

Answers (1)

Pavel Orekhov
Pavel Orekhov

Reputation: 2177

It turns out that this is because it's just one batch and the data in it is unordered. When I created a new file with the value of 1994-12-28T06:03,pavel,123 it did get dropped, because it's a part of a new batch.

Upvotes: 2

Related Questions