Reputation: 2177
Here's the simple code that I use to test watermarking:
spark.readStream
.textFile("C:\\Users\\Pavel_Orekhov\\Desktop\\stream")
.map(_.split(","))
.map(a => (a(0), a(1), a(2)))
.toDF("hour", "hashTag", "userId")
.selectExpr("CAST(hour as TimeStamp)","hashTag", "userId")
.withWatermark("hour", "1 hour")
.groupBy(
window($"hour", "1 hour", "1 hour"),
$"hashTag",
$"userId"
).count()
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start().processAllAvailable()
The folder stream
contains one file with these contents:
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T10:03,pavel,123
1994-12-28T10:03,pavel,123
1994-12-28T11:03,pavel,123
1994-12-28T11:03,pavel,123
1994-12-28T09:03,pavel,123
1994-12-28T06:03,pavel,123
The output that I get is this:
+--------------------+-------+------+-----+
| window|hashTag|userId|count|
+--------------------+-------+------+-----+
|[1994-12-28 09:00...| pavel| 123| 7|
|[1994-12-28 06:00...| pavel| 123| 1|
|[1994-12-28 11:00...| pavel| 123| 2|
|[1994-12-28 10:00...| pavel| 123| 2|
+--------------------+-------+------+-----+
In the text file that I read from you can see that there is 9AM entry and a 6AM entry that comes after an 11AM entry. I thought these would be dropped, because watermarking should only update the data that we received within the last hour.
So, why does it not get dropped?
Upvotes: 1
Views: 106
Reputation: 2177
It turns out that this is because it's just one batch and the data in it is unordered. When I created a new file with the value of 1994-12-28T06:03,pavel,123
it did get dropped, because it's a part of a new batch.
Upvotes: 2