Hako
Hako

Reputation: 381

How can I join two continuous queries in Flink Table API?

I'd like to stack up two continuous queries (views based on a single upstream connector) together and eventually be consistent in my sink result at the end of the pipeline.

Sinking just one of these transformations at a time seems to be working fine. Eventually generating consistent results.

Combining them together seems to not process the late-arriving duplicated events that the first query is supposed to remove (if we waited long enough) and push to the second query as an updated state.

As an example, if I print deduplicated events view with toRetractStream I get e1, e2, e3 as result. e4 getting eliminated because it's a duplicated event of e3.

If I print the second view, I get e1, e2, e3 and e4 as the result because both e3 and e4 are eligible to get merged. However, eventually, I want e4 to be removed from this result when it's noticed to be a duplicated event in the first view.

Is it possible to do that with Flink Table API?

EDIT: Sharing an example use case below.

-- Step 0: Create the upstream table with Kinesis connector: Events 
  Table

  All events:
  event_id, book_id, source, happened_at
  11e5dc326,161111,source_1,2021-11-19T01:39:11
  12e5dc326,171111,source_1,2021-11-19T01:39:11
  18e5dc326,171111,source_2,2021-11-29T01:39:11
  19e5dc326,171111,source_2,2022-11-29T01:39:11
  20e5dc326,171111,source_2,2022-11-29T01:39:11
  21e5dc326,171111,source_2,2021-11-30T01:39:11


  -- Step 1: Dedupe events
  CREATE VIEW EventsDeduped
  AS
  (
    SELECT event_id, book_id, source, happenedAt, 
    sourceRawData
     FROM (
        SELECT *,
         ROW_NUMBER() OVER (PARTITION BY book_id, source, ORDER BY 
     happened_at ASC) AS row_num
        FROM Events)
        WHERE row_num = 1
    )
  )

  tableEnv.toRetractStream[Row](tableEnv.sqlQuery("SELECT * FROM 
  EventsDeduped").print()

  12> (true,11e5dc326-f,161111,source_1,2021-11-19T01:39:11)
  12> (true,12e5dc326f,171111,source_1,2021-11-19T01:39:11)
  10> (true,18e5dc326,171111,source_2,2021-11-29T01:39:11)


  CREATE VIEW EventsDedupedSource1
  AS
  (
    SELECT  *
    FROM EventsDeduped
    WHERE source='source_1'
  )

  CREATE VIEW EventsDedupedSource2
  AS
  (
    SELECT  *
    FROM EventsDeduped
    WHERE source='source_2'
  )

  -- Step 2: Merge events coming from different sources
  CREATE VIEW EventsDedupedMerged
  AS
  (
    SELECT
     COALESCE(source_2.event_id, source_1.event_id) AS event_id,
     COALESCE(source_2.book_id, source_1.book_id) AS book_id,
     COALESCE(source_2.source, source_1.source) AS source,
     COALESCE(source_2.happened_at, source_1.happened_at) AS 
    happened_at,
     CASE WHEN
       source_2.book_id IS NOT NULL AND
       source_1.book_id IS NOT NULL THEN True Else False END AS 
    merged
    FROM
     EventsDedupedSource1 source_1
    FULL OUTER JOIN
     EventsDedupedSource2 source_2
    ON source_1.book_id = source_2.book_id AND ABS(TIMESTAMPDIFF(DAY, 
    source_1.happenedAt, source_2.happenedAt)) < 30
  )

  tableEnv.toRetractStream[Row](tableEnv.sqlQuery("SELECT * FROM EventsDedupedMerged").print()

  <action>,event_id, book_id, source, happened_at, merged
 7> (true,11e5dc326,161111,source_1,2021-11-19T01:39:11,false)
 5> (true,12e5dc326,171111,source_1,2021-11-19T01:39:11,false)
 5> (false,12e5dc326,171111,source_1,2021-11-19T01:39:11,false)
 5> (true,18e5dc326,171111,source_2,2021-11-29T01:39:11,true)
 5> (true,19e5dc326,171111,source_2,2022-11-29T01:39:11,false)
 5> (true,20e5dc326,171111,source_2,2022-11-29T01:39:11,false)
 5> (true,21e5dc326,171111,source_2,2021-11-30T01:39:11,true)

  

At the end of this pipeline, expecting to have just 18e5dc326 event to have merged field true because from the first step, the event with id: 21e5dc326 should have been deduped.

However, the second step somehow still merges it with another event, hence having true in its merged field.

Upvotes: 1

Views: 441

Answers (1)

Hako
Hako

Reputation: 381

This was related to an error in my query. I was joining the events table instead of the deduped ones.

Keeping this post anyway for others to see that yes, you can join dependent views subsequently on Flink Table API and the result will eventually be consistent.

Upvotes: 2

Related Questions