rony
rony

Reputation: 1

Error in Implementing Flink SQL Processing Time Temporal Left Join

I have a stream of data coming from Kafka which I want to enrich with a static data stored in Parquet files in Hadoop, and finally write to a Filesystem sink.

Initially I tried a lookup join as below,

SELECT t1.*,t2.enrichment_data_col from source_stream_table AS t1 
LEFT JOIN lookupTable FOR SYSTEM_TIME AS OF t1.proctime AS t2 
ON t1.lookup_type = t2.lookup_type

but got the below error

org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.createJoinOperator(StreamExecTemporalJoin.java:292)
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.getJoinOperator(StreamExecTemporalJoin.java:254)
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.java:179)

Next I tried to implement the Temporal Table Function LEFT JOIN as per https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#left-outer-join as below,


TableDescriptor lookupDescriptor = TableDescriptor.forConnector("filesystem").format(FormatDescriptor.forFormat("parquet").build())
                .option("path",lookupFileLocation)
                .schema(Schema.newBuilder()
                        .column("lookup_type", DataTypes.STRING().notNull())
                        .column("enrichment_data_col",DataTypes.INT())
                        .columnByExpression("proc_time","PROCTIME()")
                        .primaryKey("lookup_type")
                        .build())
                .build();

tenv.createTable("lookupTable",lookupDescriptor);


TemporalTableFunction tmpLookup = tenv.from("lookupTable").createTemporalTableFunction($("proc_time"),$("lookup_type"));
tenv.createTemporarySystemFunction("lookupTableFunc",tmpLookup);


SELECT t1.*,t2.enrichment_data_col from source_stream_table AS t1 
LEFT OUTER JOIN LATERAL TABLE(lookupTableFunc(t1.proctime)) AS t2 
ON TRUE

but now getting the below error,

org.apache.flink.table.api.ValidationException: Only single column join key is supported. Found [I@6f8667bb in [Temporal Table Function]
    at org.apache.flink.table.planner.plan.utils.TemporalJoinUtil$.validateTemporalFunctionPrimaryKey(TemporalJoinUtil.scala:383)
    at org.apache.flink.table.planner.plan.utils.TemporalJoinUtil$.validateTemporalFunctionCondition(TemporalJoinUtil.scala:365)

The Temporal Table Function INNER JOIN syntax works fine though, but I am looking for a left join.

I tried it out with Flink Version 1.15.1 and 1.16.0

Is there some other way to achieve Temporal LEFT JOIN in Flink or am I missing out something here.

Upvotes: 0

Views: 778

Answers (1)

BenoitParis
BenoitParis

Reputation: 3184

Your first attempt is not supported indeed:

Currently, the FOR SYSTEM_TIME AS OF syntax used in temporal join with latest version of any view/table is not support yet

Basically, processing time is discouraged, being not deterministic. You could try to com up with an event time:

order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

EDIT: remove second part (false information)

Upvotes: 0

Related Questions