Ashutosh
Ashutosh

Reputation: 53

FLINK- how to process logic on sql query result

My requirement is to process or build some logic around the result of sql query in flink. For simplicity lets say I have two sql query they are running on different window size and one event stream. My question is

DataStream<Event> ds = ...        
String query = "select id, key" +
                "  from  eventTable  GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ";

        String query1 = "select id, key" +
                "  from  eventTable  GROUP BY TUMBLE(rowTime, INTERVAL '1' DAY), id, key ";
        List<String> list = new ArrayList<>();
        list.add(query);
        list.add(query1);
       
        tabEnv.createTemporaryView("eventTable", ds, $("id"), $("timeLong"), $("key"),$("rowTime").rowtime());


        for(int i =0; i< list.size(); i++ ){
            Table result = tabEnv.sqlQuery(list.get(i));
            DataStream<Tuple2<Boolean, Row>> dsRow = tabEnv.toRetractStream(result, Row.class);
            dsRow.process(new ProcessFunction<Tuple2<Boolean, Row>, Object>() {

            List<Row> listRow = new ArrayList<>();
            @Override
            public void processElement(Tuple2<Boolean, Row> booleanRowTuple2, Context context, Collector<Object> collector) throws Exception {
                listRow.add(booleanRowTuple2.f1);
            }
            });
        }

Appreciate your help. thanks Ashutosh

Upvotes: 0

Views: 566

Answers (1)

David Anderson
David Anderson

Reputation: 43439

To sort out which results are from which query, you could include an identifier for each query in the queries themselves, e.g.,

SELECT '10sec', id, key FROM eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key

Determining the number of rows in the result table is trickier. One issue is that there is no final answer to the number of results from a streaming query. But where you are processing the results, it seems like you could count the number of rows.

Or, and I haven't tried this, but maybe you could use something like row_number() over(order by tumble_rowtime(rowTime, interval '10' second)) to annotate each row of the result with a counter.

Upvotes: 1

Related Questions