Reputation: 3
I'm trying to read data from kafka topic into DataStream and register DataStream, after that use TableEnvironment.sqlQuery("SQL") to query the data, when TableEnvironment.execute() there is no error and no output.
public static void main(String[] args){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharateristic.EventTime);
env.enableCheckpointing(5000);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer(
"topic",
new JSONDeserializer(),
Job.getKafkaProperties
);
consumer.setStartFromEarliest();
DataStream<Person> stream = env.addSource(consumer).fliter(x -> x.status != -1).assignTimestampAndWatermarks(new AssignerWithPeriodicWatermarks<Person>(){
long current = 0L;
final long expire = 1000L;
@Override
public Watermakr getCurrentWatermark(){
return new Watermark(current - expire);
}
@Override
public long extractTimestamp(Person person){
long timestamp = person.createTime;
current = Math.max(timestamp,current);
return timestamp;
}
});
//set createTime as rowtime
tableEnvironment.registerDataStream("Table_Person",stream,"name,age,sex,createTime.rowtime");
Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(t,Types.Row(new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.LONG})).print();
tableEnvironment.execute("person-query");
}
when i execute,there was nothing print on console or throw any exceptions; but if i use fromCollection() as a source,the program will print something on the console; Can you please guide me to fix this?
dependencies:
Upvotes: 0
Views: 2018
Reputation: 43697
In the code where you convert the SQL query's result back to a DataStream, you need to pass res
rather than t
to toAppendStream
. (I can't see how the code you've posted will even compile — where is t
declared?) And I think you should be able to do this
Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(res,Row.class).print();
rather than bothering with the TypeInformation
.
Upvotes: 0