Reputation: 1
I have a stream of data coming from Kafka which I want to enrich with a static data stored in Parquet files in Hadoop, and finally write to a Filesystem sink.
Initially I tried a lookup join as below,
SELECT t1.*,t2.enrichment_data_col from source_stream_table AS t1
LEFT JOIN lookupTable FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.lookup_type = t2.lookup_type
but got the below error
org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.createJoinOperator(StreamExecTemporalJoin.java:292)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.getJoinOperator(StreamExecTemporalJoin.java:254)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.java:179)
Next I tried to implement the Temporal Table Function LEFT JOIN as per https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#left-outer-join as below,
TableDescriptor lookupDescriptor = TableDescriptor.forConnector("filesystem").format(FormatDescriptor.forFormat("parquet").build())
.option("path",lookupFileLocation)
.schema(Schema.newBuilder()
.column("lookup_type", DataTypes.STRING().notNull())
.column("enrichment_data_col",DataTypes.INT())
.columnByExpression("proc_time","PROCTIME()")
.primaryKey("lookup_type")
.build())
.build();
tenv.createTable("lookupTable",lookupDescriptor);
TemporalTableFunction tmpLookup = tenv.from("lookupTable").createTemporalTableFunction($("proc_time"),$("lookup_type"));
tenv.createTemporarySystemFunction("lookupTableFunc",tmpLookup);
SELECT t1.*,t2.enrichment_data_col from source_stream_table AS t1
LEFT OUTER JOIN LATERAL TABLE(lookupTableFunc(t1.proctime)) AS t2
ON TRUE
but now getting the below error,
org.apache.flink.table.api.ValidationException: Only single column join key is supported. Found [I@6f8667bb in [Temporal Table Function]
at org.apache.flink.table.planner.plan.utils.TemporalJoinUtil$.validateTemporalFunctionPrimaryKey(TemporalJoinUtil.scala:383)
at org.apache.flink.table.planner.plan.utils.TemporalJoinUtil$.validateTemporalFunctionCondition(TemporalJoinUtil.scala:365)
The Temporal Table Function INNER JOIN syntax works fine though, but I am looking for a left join.
I tried it out with Flink Version 1.15.1
and 1.16.0
Is there some other way to achieve Temporal LEFT JOIN in Flink or am I missing out something here.
Upvotes: 0
Views: 778
Reputation: 3184
Your first attempt is not supported indeed:
Currently, the FOR SYSTEM_TIME AS OF syntax used in temporal join with latest version of any view/table is not support yet
Basically, processing time is discouraged, being not deterministic. You could try to com up with an event time:
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
EDIT: remove second part (false information)
Upvotes: 0