Reputation: 381
I'd like to stack up two continuous queries (views based on a single upstream connector) together and eventually be consistent in my sink result at the end of the pipeline.
Sinking just one of these transformations at a time seems to be working fine. Eventually generating consistent results.
Combining them together seems to not process the late-arriving duplicated events that the first query is supposed to remove (if we waited long enough) and push to the second query as an updated state.
As an example, if I print deduplicated events view with toRetractStream I get e1, e2, e3 as result. e4 getting eliminated because it's a duplicated event of e3.
If I print the second view, I get e1, e2, e3 and e4 as the result because both e3 and e4 are eligible to get merged. However, eventually, I want e4 to be removed from this result when it's noticed to be a duplicated event in the first view.
Is it possible to do that with Flink Table API?
EDIT: Sharing an example use case below.
-- Step 0: Create the upstream table with Kinesis connector: Events
Table
All events:
event_id, book_id, source, happened_at
11e5dc326,161111,source_1,2021-11-19T01:39:11
12e5dc326,171111,source_1,2021-11-19T01:39:11
18e5dc326,171111,source_2,2021-11-29T01:39:11
19e5dc326,171111,source_2,2022-11-29T01:39:11
20e5dc326,171111,source_2,2022-11-29T01:39:11
21e5dc326,171111,source_2,2021-11-30T01:39:11
-- Step 1: Dedupe events
CREATE VIEW EventsDeduped
AS
(
SELECT event_id, book_id, source, happenedAt,
sourceRawData
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY book_id, source, ORDER BY
happened_at ASC) AS row_num
FROM Events)
WHERE row_num = 1
)
)
tableEnv.toRetractStream[Row](tableEnv.sqlQuery("SELECT * FROM
EventsDeduped").print()
12> (true,11e5dc326-f,161111,source_1,2021-11-19T01:39:11)
12> (true,12e5dc326f,171111,source_1,2021-11-19T01:39:11)
10> (true,18e5dc326,171111,source_2,2021-11-29T01:39:11)
CREATE VIEW EventsDedupedSource1
AS
(
SELECT *
FROM EventsDeduped
WHERE source='source_1'
)
CREATE VIEW EventsDedupedSource2
AS
(
SELECT *
FROM EventsDeduped
WHERE source='source_2'
)
-- Step 2: Merge events coming from different sources
CREATE VIEW EventsDedupedMerged
AS
(
SELECT
COALESCE(source_2.event_id, source_1.event_id) AS event_id,
COALESCE(source_2.book_id, source_1.book_id) AS book_id,
COALESCE(source_2.source, source_1.source) AS source,
COALESCE(source_2.happened_at, source_1.happened_at) AS
happened_at,
CASE WHEN
source_2.book_id IS NOT NULL AND
source_1.book_id IS NOT NULL THEN True Else False END AS
merged
FROM
EventsDedupedSource1 source_1
FULL OUTER JOIN
EventsDedupedSource2 source_2
ON source_1.book_id = source_2.book_id AND ABS(TIMESTAMPDIFF(DAY,
source_1.happenedAt, source_2.happenedAt)) < 30
)
tableEnv.toRetractStream[Row](tableEnv.sqlQuery("SELECT * FROM EventsDedupedMerged").print()
<action>,event_id, book_id, source, happened_at, merged
7> (true,11e5dc326,161111,source_1,2021-11-19T01:39:11,false)
5> (true,12e5dc326,171111,source_1,2021-11-19T01:39:11,false)
5> (false,12e5dc326,171111,source_1,2021-11-19T01:39:11,false)
5> (true,18e5dc326,171111,source_2,2021-11-29T01:39:11,true)
5> (true,19e5dc326,171111,source_2,2022-11-29T01:39:11,false)
5> (true,20e5dc326,171111,source_2,2022-11-29T01:39:11,false)
5> (true,21e5dc326,171111,source_2,2021-11-30T01:39:11,true)
At the end of this pipeline, expecting to have just 18e5dc326 event to have merged field true because from the first step, the event with id: 21e5dc326 should have been deduped.
However, the second step somehow still merges it with another event, hence having true in its merged field.
Upvotes: 1
Views: 441
Reputation: 381
This was related to an error in my query. I was joining the events table instead of the deduped ones.
Keeping this post anyway for others to see that yes, you can join dependent views subsequently on Flink Table API and the result will eventually be consistent.
Upvotes: 2