Ivan Barrios
Ivan Barrios

Reputation: 3

Flink streaming table using kafka source and using flink sql to query

I'm trying to read data from kafka topic into DataStream and register DataStream, after that use TableEnvironment.sqlQuery("SQL") to query the data, when TableEnvironment.execute() there is no error and no output.

public static void main(String[] args){
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharateristic.EventTime);
   env.enableCheckpointing(5000);
   StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
   FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer(
                                                                "topic",
                                                                 new JSONDeserializer(),
                                                                 Job.getKafkaProperties
                                                               );

  consumer.setStartFromEarliest();
  DataStream<Person> stream = env.addSource(consumer).fliter(x -> x.status != -1).assignTimestampAndWatermarks(new AssignerWithPeriodicWatermarks<Person>(){
      long current = 0L;
      final long expire = 1000L;

      @Override
      public Watermakr getCurrentWatermark(){
         return new Watermark(current - expire);
      }

      @Override
      public long extractTimestamp(Person person){
         long timestamp = person.createTime;
         current = Math.max(timestamp,current);
         return timestamp;
      }
  });
  //set createTime as rowtime
  tableEnvironment.registerDataStream("Table_Person",stream,"name,age,sex,createTime.rowtime");
  Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
  tableEnvironment.toAppendStream(t,Types.Row(new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.LONG})).print();
tableEnvironment.execute("person-query");
}

when i execute,there was nothing print on console or throw any exceptions; but if i use fromCollection() as a source,the program will print something on the console; Can you please guide me to fix this?

dependencies:

  1. flink-streaming-java_2.11 version:1.9.0-csa1.0.0.0;
  2. flink-streaming-scala_2.11 version:1.9.0-csa1.0.0.0;
  3. flink-connector-kafka_2.11 version:1.9.0-csa1.0.0.0;
  4. flink-table-api-java-bridge_2.11 version:1.9.0-csa1.0.0.0;
  5. flink-table-planner_2.11 version:1.9.0-csa1.0.0.0;

Upvotes: 0

Views: 2018

Answers (1)

David Anderson
David Anderson

Reputation: 43697

In the code where you convert the SQL query's result back to a DataStream, you need to pass res rather than t to toAppendStream. (I can't see how the code you've posted will even compile — where is t declared?) And I think you should be able to do this

Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(res,Row.class).print();

rather than bothering with the TypeInformation.

Upvotes: 0

Related Questions