Metehan Yıldırım
Metehan Yıldırım

Reputation: 391

Flink SQL Watermark Strategy After Join Operation

My problem is that I cannot use the ORDER BY clause after the JOIN operation. To reproduce the problem,

CREATE TABLE stack (
    id INT PRIMARY KEY,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5',
  'fields.id.kind'='sequence',
 'fields.id.start'='1',
 'fields.id.end'='100'
);

This table has a watermark strategy and TIMESTAMP(3) *ROWTIME* type on ts.

Flink SQL> DESC stack;
+------+------------------------+-------+---------+--------+----------------------------+
| name |                   type |  null |     key | extras |                  watermark |
+------+------------------------+-------+---------+--------+----------------------------+
|   id |                    INT | FALSE | PRI(id) |        |                            |
|   ts | TIMESTAMP(3) *ROWTIME* |  TRUE |         |        | `ts` - INTERVAL '1' SECOND |
+------+------------------------+-------+---------+--------+----------------------------+
2 rows in set

However, if I define a view as a simple self-join

CREATE VIEW self_join AS (
SELECT l.ts, l.id, r.id
FROM stack as l INNER JOIN stack as r
ON l.id=r.id
);

it loses the watermark strategy but not the type,

Flink SQL> DESC self_join;
+------+------------------------+-------+-----+--------+-----------+
| name |                   type |  null | key | extras | watermark |
+------+------------------------+-------+-----+--------+-----------+
|   ts | TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
|   id |                    INT | FALSE |     |        |           |
|  id0 |                    INT | FALSE |     |        |           |
+------+------------------------+-------+-----+--------+-----------+
3 rows in set

I assume that we can preserve the watermark strategy and use ORDER BY after a JOIN operation but this is not the case. How can I add a watermark strategy again to the VIEW?

Thanks in advance.

Upvotes: 2

Views: 893

Answers (1)

David Anderson
David Anderson

Reputation: 43439

Whenever Flink SQL performs a regular join in streaming mode (a join without any sort of temporal constraint), it's not possible for the result to have watermarks. Which in turn means that you can't sort or apply windowing to the result.

Why is this, and what can you do about it?

Background

Flink SQL uses time attributes (in this case, stack.ts) to optimize state retention. Because the stack stream/table has a time attribute, we know that this stream will be processed more-or-less in order, by time (the elements are constrained to be at most 1 second out-of-order). This then places a tight constraint on how much state must be retained in order to perform an operation like sorting this table -- a 1-second-long buffer will be enough.

If stack didn't have a time attribute defined on it (i.e., a timestamp field with watermarking defined on it), then Flink SQL would refuse to sort it (in streaming mode) because doing so would require keeping around an unbounded amount of state, and it would be impossible to know how long to wait before emitting the first result.

The result of a regular join cannot have a well-defined watermark strategy

Any type of regular join requires that Flink store in its state backend all rows of the input tables forever (which Flink is willing to try to do). But more to the point, watermarking is not well-defined on the result, because there are no constraints on how out-of-order it might be.

What you can do

If you modify the join to be either an interval join or a temporal join then the result will still have watermarks. E.g., you could do this:

CREATE VIEW self_join AS (
  SELECT l.ts, l.id, r.id
  FROM stack as l INNER JOIN stack as r
  ON l.id=r.id
  WHERE ls.ts BETWEEN r.ts - INTERVAL '1' MINUTE AND r.ts
);

or you could do this:

CREATE VIEW self_join AS (
  SELECT l.ts, l.id, r.id
  FROM stack as l INNER JOIN stack as r FOR SYSTEM_TIME AS OF r.ts
  ON l.id=r.id
);

In both of these cases, Flink's SQL engine will be able to retain less state than with the regular join, and it will be able to produce watermarks in the output stream/table.

Another possible solution would be to convert the result table to a DataStream, then use the DataStream API to apply watermarking, and then convert that stream back to a table. But that's only going to make sense if you have some domain knowledge that allows you to know how out-of-order the result stream might be -- and you probably could have expressed that same information as either an interval or temporal join.

Upvotes: 4

Related Questions