rkabhishek
rkabhishek

Reputation: 946

Structured streaming multiple watermarks

I use Spark 2.3.0 if that matters.

According to the Structured Streaming documentation, it handles late data using watermarks. It also mentions that streaming deduplication is also achieved by using watermarking to keep a limit on how much of enter link description here an intermediate state is stored.

So, my question is if these watermarks can have different values or is the watermark specified only once? I ask this because I will be deduplicating values after aggregation so the tolerance for handling late data is different.

Upvotes: 0

Views: 1253

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74789

From the Policy for handling multiple watermarks:

A streaming query can have multiple input streams that are unioned or joined together. Each of the input streams can have a different threshold of late data that needs to be tolerated for stateful operations. You specify these thresholds using withWatermarks("eventTime", delay) on each of the input streams.

While executing the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations. By default, the minimum is chosen as the global watermark because it ensures that no data is accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stop receiving data due to upstream failures). In other words, the global watermark will safely move at the pace of the slowest stream and the query output will be delayed accordingly.

Since Spark 2.4, you can set the multiple watermark policy to choose the maximum value as the global watermark by setting the SQL configuration spark.sql.streaming.multipleWatermarkPolicy to max (default is min).

In fact, this also applies to any watermark-sensitive operator.

Upvotes: 2

Related Questions