Reputation: 53
My requirement is to process or build some logic around the result of sql query in flink. For simplicity lets say I have two sql query they are running on different window size and one event stream. My question is
DataStream<Event> ds = ...
String query = "select id, key" +
" from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ";
String query1 = "select id, key" +
" from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '1' DAY), id, key ";
List<String> list = new ArrayList<>();
list.add(query);
list.add(query1);
tabEnv.createTemporaryView("eventTable", ds, $("id"), $("timeLong"), $("key"),$("rowTime").rowtime());
for(int i =0; i< list.size(); i++ ){
Table result = tabEnv.sqlQuery(list.get(i));
DataStream<Tuple2<Boolean, Row>> dsRow = tabEnv.toRetractStream(result, Row.class);
dsRow.process(new ProcessFunction<Tuple2<Boolean, Row>, Object>() {
List<Row> listRow = new ArrayList<>();
@Override
public void processElement(Tuple2<Boolean, Row> booleanRowTuple2, Context context, Collector<Object> collector) throws Exception {
listRow.add(booleanRowTuple2.f1);
}
});
}
Appreciate your help. thanks Ashutosh
Upvotes: 0
Views: 566
Reputation: 43439
To sort out which results are from which query, you could include an identifier for each query in the queries themselves, e.g.,
SELECT '10sec', id, key FROM eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key
Determining the number of rows in the result table is trickier. One issue is that there is no final answer to the number of results from a streaming query. But where you are processing the results, it seems like you could count the number of rows.
Or, and I haven't tried this, but maybe you could use something like row_number() over(order by tumble_rowtime(rowTime, interval '10' second))
to annotate each row of the result with a counter.
Upvotes: 1