user7432713
user7432713

Reputation: 226

Error while creating a tumbling window in Apache Flink Java

I try to create a tumbling time window of 2 rows each in Flink Java. This must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT datatype) column. I've added code of two different code version. The error I get I placed above the code.

When I print the datatypes of the Table object I see this: |-- mID: INT |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime: BIGINT |-- mType: STRING

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);   
        fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
        Types.INT(),
        Types.SQL_TIMESTAMP(),
        Types.DOUBLE(),
        Types.LONG(),
        Types.STRING());
        DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
                tableEnv.toAppendStream(HTable, tupleType);

//When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.

Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, unixDateTime, mType");
   DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
stream.print();

//When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported.

   Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, measurementValue, unixDateTime, measurementType")
.window(Tumble.over("2.rows")
.on("dateTime")
.as("a"))
.groupBy("a")
.select("AVG(mValue)");
    DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
    stream.print();






Upvotes: 0

Views: 538

Answers (1)

David Anderson
David Anderson

Reputation: 43499

Time based operations on streaming tables require that you explicitly inform Flink about how time is to be handled. You'll want to read the relevant section of the documentation.

You'll want to pay particular attention to the section on event time.

Upvotes: 1

Related Questions