Reputation: 1
Since TO_TIMESTAMP(value, format)
method in Flink Table API does not support custom formats like yyyyMMddHHmmssSSS
, we needed to create a UDF(User Defined Function) for custom conversion.
However, when we tried to use it, Flink Table api gave Table program compile error as follows:
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) ~[flink-table-runtime-1.17.1.jar:1.17.1] at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) ~[flink-table-runtime-1.17.1.jar:1.17.1] ....
Caused by: org.codehaus.commons.compiler.CompileException: Line 31, Column 83: Cannot determine simple type name "org" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) ~[flink-table-runtime-1.17.1.jar:1.17.1] at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833) ~[flink-table-runtime-1.17.1.jar:1.17.1] ....
java.lang.RuntimeException: Could not instantiate generated class 'WatermarkGenerator$0'
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) ~[flink-table-runtime-1.17.1.jar:1.17.1]
We tested the UDF both with Flink 1.16.1 and 1.17.1, got the same error.
We actually have two UDFs, TOBIGDECIMAL()
works perfectly but TO_LOCALDATETIME()
gives error when we assign the converted field to watermark as follows. Note that, if we dont assign it watermark, it works perfectly as well.
UDF definition:
public class ToLocalDateTimeFunction extends ScalarFunction {
private static final DateTimeFormatter format = new DateTimeFormatterBuilder()
.appendPattern("yyyyMMddHHmmss")
.appendValue(ChronoField.MILLI_OF_SECOND, 3)
.toFormatter();
public LocalDateTime eval(String value) {
try {
return LocalDateTime.parse(value, format);
} catch (Exception e) {
return null;
}
}
}
UDF Registration:
tableEnvironment.createTemporarySystemFunction("TO_BIGDECIMAL", ToBigDecimalFunction.class);
tableEnvironment.createTemporarySystemFunction("TO_LOCALDATETIME", ToLocalDateTimeFunction.class);
tableEnvironment.createTemporaryTable("transactionTable", transactionDataStream);
Using the UDF in TableDescriptor:
TableDescriptor.forConnector(KafkaDynamicTableFactory.IDENTIFIER)
.schema(Schema.newBuilder()
.columnByExpression("entityId", "raw_data.entity_id")
.columnByExpression("tranDate", "TO_LOCALDATETIME(raw_data.tran_date)")
.columnByExpression("rowtime", "CAST(TO_LOCALDATETIME(raw_data.tran_date) AS TIMESTAMP_LTZ(3))")
.columnByExpression("amt", "TO_BIGDECIMAL(raw_data.amt)")
.watermark("rowtime", "rowtime - INTERVAL '10' MINUTE")
.columnByExpression("processTime", "PROCTIME()")
.column("raw_data", DataTypes.ROW(
DataTypes.FIELD("tran_date", DataTypes.STRING()),
DataTypes.FIELD("entity_id", DataTypes.BIGINT()),
DataTypes.FIELD("amt", DataTypes.STRING())
))
.build())
.format("protobuf")
.option("protobuf.message-class-name", RawDataCarrierProto.class.getName())
.option("topic", topicName)
.option("properties.bootstrap.servers", kafkaBootstrapServers)
.option("properties.group.id", consumerGroupId)
.option("scan.startup.mode", "latest-offset")
.build();
Upvotes: 0
Views: 392
Reputation: 43454
I don't know that it will work, but I would try
...
.columnByExpression(
"rowtime",
"CAST(TO_LOCALDATETIME(raw_data.tran_date) AS TIMESTAMP_LTZ(3))")
.watermark(
"rowtime",
"CAST(TO_LOCALDATETIME(raw_data.tran_date) AS TIMESTAMP_LTZ(3)) - INTERVAL '10' MINUTE")
...
Upvotes: 0