Flink Table API program does not compile when assigning watermark using a field converted with UDF

Since TO_TIMESTAMP(value, format) method in Flink Table API does not support custom formats like yyyyMMddHHmmssSSS, we needed to create a UDF(User Defined Function) for custom conversion.

However, when we tried to use it, Flink Table api gave Table program compile error as follows:

Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.  at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) ~[flink-table-runtime-1.17.1.jar:1.17.1]  at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) ~[flink-table-runtime-1.17.1.jar:1.17.1] .... 

Caused by: org.codehaus.commons.compiler.CompileException: Line 31, Column 83: Cannot determine simple type name "org"  at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) ~[flink-table-runtime-1.17.1.jar:1.17.1]  at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833) ~[flink-table-runtime-1.17.1.jar:1.17.1] ....

java.lang.RuntimeException: Could not instantiate generated class 'WatermarkGenerator$0'
    at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) ~[flink-table-runtime-1.17.1.jar:1.17.1]   

We tested the UDF both with Flink 1.16.1 and 1.17.1, got the same error.

We actually have two UDFs, TOBIGDECIMAL() works perfectly but TO_LOCALDATETIME() gives error when we assign the converted field to watermark as follows. Note that, if we dont assign it watermark, it works perfectly as well.

UDF definition:

public class ToLocalDateTimeFunction extends ScalarFunction {
    private static final DateTimeFormatter format = new DateTimeFormatterBuilder()
            .appendPattern("yyyyMMddHHmmss")
            .appendValue(ChronoField.MILLI_OF_SECOND, 3)
            .toFormatter();
    public LocalDateTime eval(String value) {
        try {
            return LocalDateTime.parse(value, format);
        } catch (Exception e) {
            return null;
        }
    }
}

UDF Registration:

tableEnvironment.createTemporarySystemFunction("TO_BIGDECIMAL", ToBigDecimalFunction.class);
tableEnvironment.createTemporarySystemFunction("TO_LOCALDATETIME", ToLocalDateTimeFunction.class);
tableEnvironment.createTemporaryTable("transactionTable", transactionDataStream);

Using the UDF in TableDescriptor:

TableDescriptor.forConnector(KafkaDynamicTableFactory.IDENTIFIER)
        .schema(Schema.newBuilder()
                .columnByExpression("entityId", "raw_data.entity_id")
                .columnByExpression("tranDate", "TO_LOCALDATETIME(raw_data.tran_date)")
                .columnByExpression("rowtime", "CAST(TO_LOCALDATETIME(raw_data.tran_date) AS TIMESTAMP_LTZ(3))")
                .columnByExpression("amt", "TO_BIGDECIMAL(raw_data.amt)")
                .watermark("rowtime", "rowtime - INTERVAL '10' MINUTE")
                .columnByExpression("processTime", "PROCTIME()")
                .column("raw_data", DataTypes.ROW(
                        DataTypes.FIELD("tran_date", DataTypes.STRING()),
                        DataTypes.FIELD("entity_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amt", DataTypes.STRING())
                ))
                .build())
        .format("protobuf")
        .option("protobuf.message-class-name", RawDataCarrierProto.class.getName())
        .option("topic", topicName)
        .option("properties.bootstrap.servers", kafkaBootstrapServers)
        .option("properties.group.id", consumerGroupId)
        .option("scan.startup.mode", "latest-offset")
        .build();

Upvotes: 0

Views: 392

Answers (1)

David Anderson
David Anderson

Reputation: 43454

I don't know that it will work, but I would try

...
.columnByExpression(
  "rowtime",
  "CAST(TO_LOCALDATETIME(raw_data.tran_date) AS TIMESTAMP_LTZ(3))")
.watermark(
  "rowtime", 
  "CAST(TO_LOCALDATETIME(raw_data.tran_date) AS TIMESTAMP_LTZ(3)) - INTERVAL '10' MINUTE")
...

Upvotes: 0

Related Questions