YT Q
YT Q

Reputation: 41

how to get the last result when doing table join (using toRetractStream in flink sql

Here is my code using flink sql API to join two tables

tEnv.createTemporaryView("A", streamA,"speed_sum,cnt,window_start_time,window_end_time");
tEnv.createTemporaryView("B",streamB,"speed_sum,cnt,window_start_time,window_end_time");

String execSQL1 = "select A.speed_sum+COALESCE(B.speed_sum,0.0), " +
        "A.cnt+COALESCE(B.cnt,0), " +
        "A.window_start_time, A.window_end_time " +
        "from A " +
        "left join B on A.window_start_time = B.window_start_time ";
Table table = tEnv.sqlQuery(execSQL1);
DataStream<Tuple2<Boolean, Row>> streamResult = tEnv.toRetractStream(table, Row.class).;
streamResult.print("streamResult");

My output of is like this:

 streamA-----------(5078.000000,199,1635333650000,1635333660000)
 streamB-----------(1721.388891,111,1635333650000,1635333660000)
 streamResult:3> (true,5078.0,199,1635333650000,1635333660000) // drop
 streamResult:3> (false,5078.0,199,1635333650000,1635333660000) // drop
 streamResult:3> (true,6799.388891220093,310,1635333650000,1635333660000)  // want to save

As you can see, the toRetractStream API would generate three pieces of record. I'm wondering how to get the last piece of record, which correctly adds up the A.speed_sum and B.speed_sum(A.cnt and B.cnt).

Upvotes: 1

Views: 745

Answers (1)

David Anderson
David Anderson

Reputation: 43454

Some streaming SQL queries, like your JOIN, produce an update stream. Given the continuous, unbounded nature of streaming, there's no way for Flink to know when the "final" result has been reached.

If you are executing this query on bounded inputs, you can execute it in batch mode, and then only the final result will be printed.

In some streaming use cases you can use time attributes rather than timestamps, and then the Flink SQL planner is able to reason about when the results for certain queries are complete. For example, this is how windows in Flink SQL are able to produce an append stream, rather than an update stream. Your query is almost an interval join. If it were an interval join, then the result stream would be an append stream, and you wouldn't have to deal with these retractions.

Upvotes: 0

Related Questions