Reputation: 11
I have a stream like this: <_time(timestamp), uri(string), userId(int)>
.
The _time
attribute is rowtime and I register it as a table:
tableEnv.registerDataStream("userVisitPage", stream, "_time.rowtime, uri,userId");
Then I query the table:
final String sql =
"SELECT tumble_start(_time, interval '10' second) as timestart, " +
" count(distinct userId) as uv, " +
" uri as uri, " +
" count(1) as pv " +
"FROM userVisitPage " +
"GROUP BY tumble(_time, interval '10' second), uri";
final Table table = tableEnv.sqlQuery(sql);
However, the query throws an exception:
org.apache.flink.table.codegen.CodeGenException: Unsupported call: TUMBLE
If you think this function should be supported, you can create an issue and start a discussion for it.
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1006)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:234)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:321)
at org.apache.flink.table.plan.nodes.CommonCalc$class.generateFunction(CommonCalc.scala:44)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.generateFunction(DataStreamCalc.scala:43)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:116)
at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:837)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:764)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:734)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:357)
How can I implement this query?
Upvotes: 1
Views: 1998
Reputation: 18987
Update: Flink 1.6.0 is available and supports DISTINCT aggregates on streaming tables.
Flink (version 1.4.x) does not support SQL queries with DISTINCT
aggregations on streaming tables yet. Support is targeted for Flink 1.6 which won't be released before mid 2018.
You can however implement a user-defined aggregation function to compute distinct counts and use that function in your queries after registering them. The query syntax will be different of course.
Upvotes: 1