Reputation: 1939
I have two Flink dynamic tables Event
and Configuration
.
Event
have the structure : [id, myTimestamp]
and Configuration
have the structure : id, myValue, myTimestamp
I am trying to do a Flink SQL query that return Event.id, Configuration.myValue
, or Event.id, null
If the Event
row id
do not match any id
from Configuration
.
Example of expected behavior (Event
and Configuration
starts empty):
The example must be read as :
[DATA_RECEIVED] => TARGET_TABLE : EXPECTED_OUTPUT
Since the SQL Query is made from a join, it is inserted in an UpsertSink
(first value of the output correspond to the upsert boolean)
[myId-1, 10] => EventTable : [(true, myId-1, null)]
[myId-1, myValue-A, 15] => ConfigurationTable : [(false, myId-1, null), (true, myId-1, myValue-A)]
[myId-1, myValue-A, 20] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, myValue-A)]
[myId-1, myValue-B, 25] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, myValue-B)]
[myId-1, 30] => EventTable : [(false, myId-1, null), (true, myId-1, myValue-B)]
So I did this query :
SELECT
Event.id,
Configuration.myValue
FROM
(SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id) as Event
LEFT JOIN
(SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue) as Configuration
ON Event.id = Configuration.id
GROUP BY Event.id, Configuration.myValue
Where LATEST_VAL
is a UDF that return the myValue
associated to MAX(myTimestamp)
.
But I have behavior that I do not understand. Here are the observed results :
[myId-1, 10] => EventTable : [(true, myId-1, null)] // OK
[myId-1, myValue-A, 15] => ConfigurationTable : [(false, myId-1, null), (true, myId-1, myValue-A)] // OK
[myId-1, myValue-A, 20] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-A)] // NOT OK
[myId-1, myValue-B, 25] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-B)] // NOT OK
[myId-1, 30] => EventTable : [(false, myId-1, null), (true, myId-1, myValue-B)] // OK
How do you explain the difference between the expected behavior and the observed behavior ? Why is there an extra output (true, myId-1, null), (false, myId-1, null)
?
Is it possible to adapt the SQL query to get the wanted behavior ?
Note :
Upvotes: 0
Views: 556
Reputation: 3422
I think the one bit you missed is that you actually join two retract streams. Even though your input streams are append only streams, you are performing an aggregations over them in the subqueries which produce retraction.
Let's first analyze results for the subqueries:
Subquery 1:
Query: SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id
Resulting stream:
(true, myId-1, 10L)
(false, myId-1, 10L)
(true, myId-1, 30L)
Subquery 2:
Query: SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue
Resulting stream:
(true, "myId-1", "myValue-A", 15L)
(false, "myId-1", "myValue-A", 15L)
(true, "myId-1", "myValue-A", 20L)
(false, "myId-1", "myValue-A", 20L)
(true, "myId-1", "myValue-B", 25L)
After that you perform the join and the grouping on top of those two retraction streams. Having that in mind what is actually joined and grouped in your example is:
[true, myId-1, 10] : [(true, myId-1, null)]
[true, myId-1, myValue-A, 15] : [(false, myId-1, null), (true, myId-1, myValue-A)]
[false, myId-1, myValue-A, 15] : [(false, myId-1, myValue-A), (true, myId-1, null)]
[true, myId-1, myValue-A, 20] : [(false, myId-1, null), (true, myId-1, myValue-A)]
[false, myId-1, myValue-A, 20] : [(false, myId-1, myValue-A), (true, myId-1, null)]
[true, myId-1, myValue-B, 25] : [(false, myId-1, null), (true, myId-1, myValue-B)]
...
Overall as far as I can tell it produces correct results. For each input row, the last emitted row represents the most recent value corresponding to the given id.
Upvotes: 1